Skip to content

Commit

Permalink
Parse large JSON module dictionaries in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
HebaruSan committed Oct 6, 2023
1 parent eb3045e commit b3ae777
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 12 deletions.
49 changes: 49 additions & 0 deletions Core/Converters/JsonParallelDictionaryConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Collections.Concurrent;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

using CKAN.Extensions;

namespace CKAN
{
/// <summary>
/// A converter that loads a dictionary in parallel,
/// use with large collections of complex objects for a speed boost
/// </summary>
public class JsonParallelDictionaryConverter<V> : JsonConverter
{
public override object ReadJson(JsonReader reader,
Type objectType,
object existingValue,
JsonSerializer serializer)
=> ParseWithProgress(JObject.Load(reader)
.Properties()
.ToArray(),
serializer);

private object ParseWithProgress(JProperty[] properties,
JsonSerializer serializer)
=> Partitioner.Create(properties, true)
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(prop => new KeyValuePair<string, V>(
prop.Name,
prop.Value.ToObject<V>()))
.WithProgress(properties.Length,
serializer.Context.Context as IProgress<int>)
.ToDictionary();

public override bool CanWrite => false;
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
throw new NotImplementedException();
}

// Only convert when we're an explicit attribute
public override bool CanConvert(Type object_type) => false;
}
}
26 changes: 26 additions & 0 deletions Core/Extensions/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace CKAN.Extensions
{
Expand Down Expand Up @@ -34,6 +35,31 @@ public static Dictionary<K, V> ToDictionary<K, V>(this IEnumerable<KeyValuePair<
=> pairs.ToDictionary(kvp => kvp.Key,
kvp => kvp.Value);

public static Dictionary<K, V> ToDictionary<K, V>(this ParallelQuery<KeyValuePair<K, V>> pairs)
=> pairs.ToDictionary(kvp => kvp.Key,
kvp => kvp.Value);

// https://stackoverflow.com/a/55591477/2422988
public static ParallelQuery<T> WithProgress<T>(this ParallelQuery<T> source,
long totalCount,
IProgress<int> progress)
{
long count = 0;
int prevPercent = -1;
return progress == null
? source
: source.Select(item =>
{
var percent = (int)(100 * Interlocked.Increment(ref count) / totalCount);
if (percent > prevPercent)
{
progress.Report(percent);
prevPercent = percent;
}
return item;
});
}

public static IEnumerable<T> Memoize<T>(this IEnumerable<T> source)
{
if (source == null)
Expand Down
7 changes: 6 additions & 1 deletion Core/Registry/InstalledModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,15 @@ private InstalledModule()
[OnDeserialized]
private void DeSerialisationFixes(StreamingContext context)
{
if (installed_files == null)
{
installed_files = new Dictionary<string, InstalledModuleFile>();
}
if (Platform.IsWindows)
{
// We need case insensitive path matching on Windows
installed_files = new Dictionary<string, InstalledModuleFile>(installed_files, StringComparer.OrdinalIgnoreCase);
installed_files = new Dictionary<string, InstalledModuleFile>(installed_files,
StringComparer.OrdinalIgnoreCase);
}
}

Expand Down
1 change: 1 addition & 0 deletions Core/Registry/Registry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Registry : IEnlistmentNotification, IRegistryQuerier
private Dictionary<string, string> installed_dlls;

[JsonProperty]
[JsonConverter(typeof(JsonParallelDictionaryConverter<InstalledModule>))]
private Dictionary<string, InstalledModule> installed_modules;

// filename (case insensitive on Windows) => module
Expand Down
19 changes: 19 additions & 0 deletions Core/Repositories/ProgressImmediate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace CKAN
{
public class ProgressImmediate<T> : IProgress<T>
{
public ProgressImmediate(Action<T> action)
{
this.action = action;
}

public void Report(T val)
{
action(val);
}

private readonly Action<T> action;
}
}
67 changes: 67 additions & 0 deletions Core/Repositories/ProgressScalePercentsByFileSize.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace CKAN
{
/// <summary>
/// Accepts progress updates in terms of percentage of one file within a group
/// and translates them into percentages across the whole operation.
/// </summary>
public class ProgressScalePercentsByFileSizes : IProgress<int>
{
/// <summary>
/// Initialize an percent-to-scaled-percent progress translator
/// </summary>
/// <param name="percentProgress">The upstream progress object expecting percentages</param>
/// <param name="sizes">Sequence of sizes of files in our group</param>
public ProgressScalePercentsByFileSizes(IProgress<int> percentProgress,
IEnumerable<long> sizes)
{
this.percentProgress = percentProgress;
this.sizes = sizes.ToArray();
totalSize = this.sizes.Sum();
}

/// <summary>
/// The IProgress member called when we advance within the current file
/// </summary>
/// <param name="currentFilePercent">How far into the current file we are</param>
public void Report(int currentFilePercent)
{
if (basePercent < 100 && currentIndex < sizes.Length)
{
var percent = basePercent + (int)(currentFilePercent * sizes[currentIndex] / totalSize);
// Only report each percentage once, to avoid spamming UI calls
if (percent > lastPercent)
{
percentProgress.Report(percent);
lastPercent = percent;
}
}
}

/// <summary>
/// Call this when you move on from one file to the next
/// </summary>
public void NextFile()
{
doneSize += sizes[currentIndex];
basePercent = (int)(100 * doneSize / totalSize);
++currentIndex;
if (basePercent > lastPercent)
{
percentProgress.Report(basePercent);
lastPercent = basePercent;
}
}

private IProgress<int> percentProgress;
private long[] sizes;
private long totalSize;
private long doneSize = 0;
private int currentIndex = 0;
private int basePercent = 0;
private int lastPercent = -1;
}
}
11 changes: 7 additions & 4 deletions Core/Repositories/ReadProgressStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ public ReadProgressStream(Stream stream, IProgress<long> progress)
public override int Read(byte[] buffer, int offset, int count)
{
int amountRead = base.Read(buffer, offset, count);
long newProgress = Position;
if (newProgress > lastProgress)
if (progress != null)
{
progress?.Report(newProgress);
lastProgress = newProgress;
long newProgress = Position;
if (newProgress > lastProgress)
{
progress?.Report(newProgress);
lastProgress = newProgress;
}
}
return amountRead;
}
Expand Down
27 changes: 24 additions & 3 deletions Core/Repositories/RepositoryData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Text;
using System.Linq;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Runtime.Serialization;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -26,6 +28,7 @@ public class RepositoryData
/// The available modules from this repository
/// </summary>
[JsonProperty("available_modules", NullValueHandling = NullValueHandling.Ignore)]
[JsonConverter(typeof(JsonParallelDictionaryConverter<AvailableModule>))]
public readonly Dictionary<string, AvailableModule> AvailableModules;

/// <summary>
Expand Down Expand Up @@ -92,19 +95,37 @@ public void SaveTo(string path)
/// Load a previously cached repo data object from JSON on disk
/// </summary>
/// <param name="path">Filename of the JSON file to load</param>
/// <param name="progress">Progress notifier to receive updates of percent completion of this file</param>
/// <returns>A repo data object or null if loading fails</returns>
public static RepositoryData FromJson(string path, IProgress<long> progress)
public static RepositoryData FromJson(string path, IProgress<int> progress)
{
try
{
long fileSize = new FileInfo(path).Length;
log.DebugFormat("Trying to load repository data from {0}", path);
// Ain't OOP grand?!
using (var stream = File.Open(path, FileMode.Open))
using (var progressStream = new ReadProgressStream(stream, progress))
using (var progressStream = new ReadProgressStream(
stream,
progress == null
? null
// Treat JSON parsing as the first 50%
: new ProgressImmediate<long>(p => progress.Report((int)(50 * p / fileSize)))))
using (var reader = new StreamReader(progressStream))
using (var jStream = new JsonTextReader(reader))
{
return new JsonSerializer().Deserialize<RepositoryData>(jStream);
var settings = new JsonSerializerSettings()
{
Context = new StreamingContext(
StreamingContextStates.Other,
progress == null
? null
: new ProgressImmediate<int>(p =>
// Treat CkanModule creation as the last 50%
progress.Report(50 + p / 2))),
};
return JsonSerializer.Create(settings)
.Deserialize<RepositoryData>(jStream);
}
}
catch (Exception exc)
Expand Down
9 changes: 5 additions & 4 deletions Core/Repositories/RepositoryDataManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void Prepopulate(List<Repository> repos, IProgress<int> percentProgress)
new FileInfo(tuple.Item2).Length))
.ToList();
// Translate from file group offsets to percent
var progress = new ProgressFilesOffsetsToPercent(
var progress = new ProgressScalePercentsByFileSizes(
percentProgress, reposAndSizes.Select(tuple => tuple.Item2));
foreach (var repo in reposAndSizes.Select(tuple => tuple.Item1))
{
Expand Down Expand Up @@ -258,10 +258,11 @@ private RepositoryData GetRepoData(Repository repo)
? data
: LoadRepoData(repo, null);

private RepositoryData LoadRepoData(Repository repo, IProgress<long> progress)
private RepositoryData LoadRepoData(Repository repo, IProgress<int> progress)
{
log.DebugFormat("Looking for data in {0}", GetRepoDataPath(repo));
var data = RepositoryData.FromJson(GetRepoDataPath(repo), progress);
var path = GetRepoDataPath(repo);
log.DebugFormat("Looking for data in {0}", path);
var data = RepositoryData.FromJson(path, progress);
if (data != null)
{
log.Debug("Found it! Adding...");
Expand Down
6 changes: 6 additions & 0 deletions Core/Types/CkanModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Runtime.Serialization;
using System.Text;
using System.Text.RegularExpressions;
using System.Reflection;

using Autofac;
using log4net;
Expand Down Expand Up @@ -281,6 +282,11 @@ internal CkanModule()
// We don't have this passed in, so we'll ask the service locator
// directly. Yuck.
_comparator = ServiceLocator.Container.Resolve<IGameComparator>();
download_content_type = typeof(CkanModule).GetTypeInfo()
.GetDeclaredField("download_content_type")
.GetCustomAttribute<DefaultValueAttribute>()
.Value
.ToString();
}

/// <summary>
Expand Down
6 changes: 6 additions & 0 deletions Core/Types/ModuleInstallDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using System.Text.RegularExpressions;
using System.Runtime.CompilerServices;
using System.Reflection;

using ICSharpCode.SharpZipLib.Zip;
using Newtonsoft.Json;
Expand Down Expand Up @@ -109,6 +110,11 @@ internal void DeSerialisationFixes(StreamingContext like_i_could_care)
[JsonConstructor]
private ModuleInstallDescriptor()
{
install_to = typeof(ModuleInstallDescriptor).GetTypeInfo()
.GetDeclaredField("install_to")
.GetCustomAttribute<DefaultValueAttribute>()
.Value
.ToString();
}

/// <summary>
Expand Down

0 comments on commit b3ae777

Please sign in to comment.