Skip to content

Commit

Permalink
Begin adding health tracking integration, sync/bench harness support/…
Browse files Browse the repository at this point in the history
…fix double use of blob wrapper
  • Loading branch information
lilith committed May 24, 2024
1 parent 5e70dc5 commit 1d82686
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 78 deletions.
22 changes: 22 additions & 0 deletions .cursorignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv)
*.txt
*.dll
*.pdb
*.user
*.png
*.jpg
*.jpeg
*.bmp
*.ico
*.webp
*.nupkg
*.json
*.info
.idea/
.vscode/
bin/
examples/
NuGetPackages/
obj/
packages/

7 changes: 6 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,10 @@
],
"cSpell.ignoreWords": [
"sthree"
]
],
"editor.fontSize": 14,
"window.zoomLevel": 1,
"window.zoomPerWindow": true,
"editor.mouseWheelZoom": true,
"terminal.integrated.mouseWheelZoom": true
}
6 changes: 6 additions & 0 deletions src/Imazen.Routing/Engine/RoutingEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public bool MightHandleRequest<TQ>(string path, TQ query) where TQ : IReadOnlyQu
}


/// <summary>
/// Errors if the route isn't a cachable blob; returns null if there's no match.
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async ValueTask<CodeResult<ICacheableBlobPromise>?> RouteToPromiseAsync(MutableRequest request, CancellationToken cancellationToken = default)
{
// log info about the request
Expand Down
108 changes: 64 additions & 44 deletions src/Imazen.Routing/Promises/Pipelines/CacheEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Imazen.Abstractions.Resulting;
using Imazen.Common.Concurrency;
using Imazen.Common.Concurrency.BoundedTaskCollection;
using Imazen.Routing.Health;
using Imazen.Routing.Helpers;
using Imazen.Routing.HttpAbstractions;
using Imazen.Routing.Requests;
Expand All @@ -21,6 +22,7 @@ namespace Imazen.Routing.Promises.Pipelines;
public class CacheEngine: IBlobPromisePipeline
{


public CacheEngine(IBlobPromisePipeline? next, CacheEngineOptions options)
{
Options = options;
Expand All @@ -30,6 +32,7 @@ public CacheEngine(IBlobPromisePipeline? next, CacheEngineOptions options)
{
Locks = new AsyncLockProvider();
}
HealthTracker = options.HealthTracker as CacheHealthTracker;
}

public async ValueTask<CodeResult<ICacheableBlobPromise>> GetFinalPromiseAsync(ICacheableBlobPromise promise, IBlobRequestRouter router,
Expand All @@ -49,7 +52,7 @@ public async ValueTask<CodeResult<ICacheableBlobPromise>> GetFinalPromiseAsync(I
return CodeResult<ICacheableBlobPromise>.Ok(
new ServerlessCachePromise(wrappedPromise.FinalRequest, wrappedPromise, this));
}

private CacheHealthTracker? HealthTracker { get; }
private IBlobPromisePipeline? Next { get; }
private AsyncLockProvider? Locks { get; }

Expand All @@ -70,7 +73,7 @@ private async Task FinishUpload(IBlobCacheRequest cacheReq, ICacheableBlobPromis
await Task.WhenAll(Options.SaveToCaches.Select(x => x.CachePut(cacheEventDetails, cancellationToken)));
}

public async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise promise,IBlobRequestRouter router, CancellationToken cancellationToken = default)
internal async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise promise,IBlobRequestRouter router, CancellationToken cancellationToken = default)
{
if (!promise.ReadyToWriteCacheKeyBasisData)
{
Expand All @@ -95,16 +98,15 @@ public async ValueTask<CodeResult<IBlobWrapper>> Fetch(ICacheableBlobPromise pro
return await FetchInner(cacheRequest, promise, router, cancellationToken);
}
}


public async ValueTask<CodeResult<IBlobWrapper>> FetchInner(IBlobCacheRequest cacheRequest, ICacheableBlobPromise promise, IBlobRequestRouter router, CancellationToken cancellationToken = default)

private async ValueTask<CodeResult<IBlobWrapper>> FetchInner(IBlobCacheRequest cacheRequest, ICacheableBlobPromise promise, IBlobRequestRouter router, CancellationToken cancellationToken = default)
{
// First check the upload queue.
if (Options.UploadQueue?.TryGet(cacheRequest.CacheKeyHashString, out var uploadTask) == true)
{
Options.Logger.LogTrace("Located requested resource from the upload queue {CacheKeyHashString}", cacheRequest.CacheKeyHashString);
return CodeResult<IBlobWrapper>.Ok(uploadTask.Blob);
return CodeResult<IBlobWrapper>.Ok(uploadTask.Blob.ForkReference());
}
// Then check the caches
List<KeyValuePair<IBlobCache,Task<CacheFetchResult>>>? allFetchAttempts = null;
Expand Down Expand Up @@ -266,9 +268,9 @@ private void LogFetchTaskStatus(bool isFresh,
LogFetchTaskStatus(isFresh, cacheHit, fetchTasks);
}

if (Options.UploadQueue == null)
if (Options.UploadQueue == null && !Options.DelayRequestUntilUploadsComplete)
{
Log.LogWarning("No upload queue configured");
Log.LogWarning("No upload queue configured, and synchronous mode disabled. Not saving to any caches.");
return null;
}

Expand Down Expand Up @@ -323,23 +325,27 @@ private void LogFetchTaskStatus(bool isFresh,



private async Task BufferAndEnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper blob, bool isFresh,
private async ValueTask BufferAndEnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper blob, bool isFresh,
IBlobCache? cacheHit, List<KeyValuePair<IBlobCache, Task<CacheFetchResult>>>? fetchTasks, CancellationToken bufferCancellationToken = default)
{
if (EnqueueSaveToCaches(cacheRequest, blob, isFresh, cacheHit, fetchTasks))
// Here's also a good place to handle the cachefetchresults; \
// HealthTracker[cacheHit].ReportBehavior();
//

if (await EnqueueSaveToCaches(cacheRequest, blob, isFresh, cacheHit, fetchTasks))
{
await blob.EnsureReusable(bufferCancellationToken);
Log.LogTrace("Called EnsureReusable on {CacheKeyHashString}", cacheRequest.CacheKeyHashString);
}
}

private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper mainBlob, bool isFresh,
private ValueTask<bool> EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper mainBlob, bool isFresh,
IBlobCache? cacheHit, List<KeyValuePair<IBlobCache,Task<CacheFetchResult>>>? fetchTasks)
{

var cachesToSaveTo = GetUploadCacheCandidates(isFresh, ref cacheHit, fetchTasks);

if (cachesToSaveTo == null || Options.UploadQueue == null) return false; // Nothing to do
if (cachesToSaveTo == null || (Options.UploadQueue == null && !Options.DelayRequestUntilUploadsComplete)) return new ValueTask<bool>(false); // Nothing to do

var blob = mainBlob.ForkReference();
mainBlob.IndicateInterest();
Expand All @@ -360,39 +366,56 @@ private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper ma
throw new InvalidOperationException();
}


var enqueueResult = Options.UploadQueue.Queue(new BlobTaskItem(cacheRequest.CacheKeyHashString,blob), async (taskItem, cancellationToken) =>
var blobTaskItem = new BlobTaskItem(cacheRequest.CacheKeyHashString,blob);

Task<PutResult[]> BulkUploader(BlobTaskItem taskItem, CancellationToken cancellationToken)
{
// We need to dispose of the blob wrapper after all uploads are complete.
using (taskItem.Blob)
{
var tasks = cachesToSaveTo.Select(async cache =>
{
var waitingInQueue = DateTime.UtcNow - taskItem.JobCreatedAt;
var tasks = cachesToSaveTo.Select(PerCacheUpload)
.ToArray();
return Task.WhenAll(tasks);

var sw = Stopwatch.StartNew();
try
{
Log.LogTrace("[put started] CachePut {key} to {CacheName}", taskItem.UniqueKey,
cache.UniqueName);
var result = await cache.CachePut(eventDetails, cancellationToken);
sw.Stop();
var r = new PutResult(cache, eventDetails, result, null, sw.Elapsed, waitingInQueue);
LogPutResult(r);
return r;
}
catch (Exception e)
{
sw.Stop();
var r = new PutResult(cache, eventDetails, null, e, sw.Elapsed, waitingInQueue);
LogPutResult(r);
return r;
}
async Task<PutResult> PerCacheUpload(IBlobCache cache)
{
var waitingInQueue = DateTime.UtcNow - taskItem.JobCreatedAt;

var sw = Stopwatch.StartNew();
try
{
Log.LogTrace("[put started] CachePut {key} to {CacheName}", taskItem.UniqueKey, cache.UniqueName);
var result = await cache.CachePut(eventDetails, cancellationToken);
sw.Stop();
var r = new PutResult(cache, eventDetails, result, null, sw.Elapsed, waitingInQueue);
LogPutResult(r);
return r;
}
).ToArray();
HandleUploadAnswers(await Task.WhenAll(tasks));
catch (Exception e)
{
sw.Stop();
var r = new PutResult(cache, eventDetails, null, e, sw.Elapsed, waitingInQueue);
LogPutResult(r);
return r;
}
}
}
});
}

async Task BulkUploaderAsync(BlobTaskItem item, CancellationToken ct) => HandleUploadAnswers(await BulkUploader(item, ct));

if (Options.DelayRequestUntilUploadsComplete)
{
var uploadTask = BulkUploader(blobTaskItem, default);
var finalTask = uploadTask.ContinueWith(t =>
{
HandleUploadAnswers(t.Result);
return true;
}, TaskContinuationOptions.ExecuteSynchronously);
return new ValueTask<bool>(finalTask);
}
if (Options.UploadQueue == null) return new ValueTask<bool>(false);
var enqueueResult = Options.UploadQueue.Queue(blobTaskItem, BulkUploaderAsync);
if (enqueueResult == TaskEnqueueResult.QueueFull)
{
Log.LogWarning("[CACHE PUT ERROR] Upload queue is full, not enqueuing {CacheKeyHashString} for upload to {Caches}", cacheRequest.CacheKeyHashString, string.Join(", ", cachesToSaveTo.Select(x => x.UniqueName)));
Expand All @@ -409,7 +432,8 @@ private bool EnqueueSaveToCaches(IBlobCacheRequest cacheRequest, IBlobWrapper ma
{
Log.LogTrace("Enqueued {CacheKeyHashString} for upload to {Caches}", cacheRequest.CacheKeyHashString, string.Join(", ", cachesToSaveTo.Select(x => x.UniqueName)));
}
return enqueueResult == TaskEnqueueResult.Enqueued;

return new ValueTask<bool>(enqueueResult == TaskEnqueueResult.Enqueued);
}
record struct PutResult(IBlobCache Cache, CacheEventDetails EventDetails, CodeResult? Result, Exception? Exception, TimeSpan Executing, TimeSpan Waiting);

Expand All @@ -432,13 +456,9 @@ private void LogPutResult(PutResult result)

private void HandleUploadAnswers(PutResult[] results)
{
//TODO?
//TODO? Cache put failures should probably affect health??

}




}

internal record ServerlessCachePromise(IRequestSnapshot FinalRequest, ICacheableBlobPromise FreshPromise, CacheEngine CacheEngine): ICacheableBlobPromise
Expand Down
15 changes: 10 additions & 5 deletions src/Imazen.Routing/Promises/Pipelines/CacheEngineOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Imazen.Abstractions.Blobs;
using Imazen.Abstractions.Logging;
using Imazen.Common.Concurrency.BoundedTaskCollection;
using Imazen.Routing.Health;
using Imazen.Routing.Requests;

namespace Imazen.Routing.Promises.Pipelines;
Expand All @@ -20,15 +21,21 @@ public CacheEngineOptions(List<IBlobCache> simultaneousFetchAndPut, BoundedTaskC
// Each cache group is a list of caches that can be queried in parallel
public required List<List<IBlobCache>> SeriesOfCacheGroups { get; init; }

// All caches we want to enable writing to
public required List<IBlobCache> SaveToCaches { get; init; }

[Obsolete("Use the parameterized one local to the request")]
public IBlobRequestRouter? RequestRouter { get; init; }

// TODO: maybe remove?
public required IReusableBlobFactory BlobFactory { get; init; }

public required BoundedTaskCollection<BlobTaskItem>? UploadQueue { get; init; }

public required object? HealthTracker { get; init; }

/// <summary>
/// Disables background upload queue, and instead uploads blobs immediately before responding to the client.
/// </summary>
public bool DelayRequestUntilUploadsComplete { get; init; }

public required IReLogger Logger { get; init; }

/// <summary>
Expand All @@ -40,6 +47,4 @@ public CacheEngineOptions(List<IBlobCache> simultaneousFetchAndPut, BoundedTaskC
/// How long to wait for fetching and generation of the same request by another thread.
/// </summary>
public int LockTimeoutMs { get; init; } = 2000;


}
26 changes: 0 additions & 26 deletions src/Imazen.Routing/Promises/Pipelines/PipelineBuilder.cs

This file was deleted.

12 changes: 10 additions & 2 deletions src/Imazen.Routing/Serving/ImageServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Imazen.Common.Licensing;
using Imazen.Routing.Caching;
using Imazen.Routing.Engine;
using Imazen.Routing.Health;
using Imazen.Routing.Helpers;
using Imazen.Routing.HttpAbstractions;
using Imazen.Routing.Layers;
Expand All @@ -37,6 +38,7 @@ internal class ImageServer<TRequest, TResponse, TContext> : IImageServer<TReques
private readonly BoundedTaskCollection<BlobTaskItem> uploadQueue;
private readonly bool shutdownRegisteredServices;
private readonly IImageServerContainer container;
private readonly CacheHealthTracker cacheHealthTracker;
public ImageServer(IImageServerContainer container,
ILicenseChecker licenseChecker,
LicenseOptions licenseOptions,
Expand Down Expand Up @@ -95,11 +97,15 @@ public ImageServer(IImageServerContainer container,
}

var allCachesExceptMemory = allCaches?.Where(c => c != memoryCache)?.ToList();

cacheHealthTracker = container.GetService<CacheHealthTracker>();
cacheHealthTracker ??= new CacheHealthTracker(logger);

var watermarkingLogic = container.GetService<WatermarkingLogicOptions>() ??
new WatermarkingLogicOptions(null, null);
var sourceCacheOptions = new CacheEngineOptions
{
HealthTracker = cacheHealthTracker,
SeriesOfCacheGroups =
[
..new[] { [memoryCache], allCachesExceptMemory ?? [] }
Expand Down Expand Up @@ -304,16 +310,18 @@ await SmallHttpResponse.Text(404, "The specified resource does not exist.\r\n" +

}

public Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
return uploadQueue.StartAsync(cancellationToken);
await uploadQueue.StartAsync(cancellationToken);
await cacheHealthTracker.StartAsync(cancellationToken);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
//TODO: error handling or no?
//await uploadCancellationTokenSource.CancelAsync();
await uploadQueue.StopAsync(cancellationToken);
await cacheHealthTracker.StopAsync(cancellationToken);
if (shutdownRegisteredServices)
{
var services = this.container.GetInstanceOfEverythingLocal<IHostedService>();
Expand Down

0 comments on commit 1d82686

Please sign in to comment.