Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
santisq committed Jul 14, 2024
2 parents cd85e5a + bca6ae7 commit 74d6ac3
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 234 deletions.
2 changes: 1 addition & 1 deletion module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.1.9'
ModuleVersion = '1.2.0'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand Down
24 changes: 14 additions & 10 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected override void BeginProcessing()
_worker.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds));
}

_worker.Start();
_worker.Run();
}

protected override void ProcessRecord()
Expand All @@ -90,12 +90,13 @@ protected override void ProcessRecord()
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.StopAndWait();
_worker.Cancel();
_worker.Wait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.WaitOperationCanceled();
_worker.Wait();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand All @@ -115,17 +116,17 @@ protected override void EndProcessing()
{
ProcessOutput(data);
}

_worker.Wait();
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.StopAndWait();
_worker.Cancel();
_worker.Wait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.WaitOperationCanceled();
_worker.Wait();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand All @@ -147,7 +148,8 @@ private void ProcessOutput(PSOutputData data)
break;

case Type.Debug:
WriteDebug((string)data.Output);
DebugRecord debug = (DebugRecord)data.Output;
WriteDebug(debug.Message);
break;

case Type.Information:
Expand All @@ -159,16 +161,18 @@ private void ProcessOutput(PSOutputData data)
break;

case Type.Verbose:
WriteVerbose((string)data.Output);
VerboseRecord verbose = (VerboseRecord)data.Output;
WriteVerbose(verbose.Message);
break;

case Type.Warning:
WriteWarning((string)data.Output);
WarningRecord warning = (WarningRecord)data.Output;
WriteWarning(warning.Message);
break;
}
}

protected override void StopProcessing() => _worker?.StopAndWait();
protected override void StopProcessing() => _worker?.Cancel();

public void Dispose()
{
Expand Down
5 changes: 3 additions & 2 deletions src/PSParallelPipeline/ExceptionHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cm
cmdlet.WriteError(new ErrorRecord(
exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet));

internal static ErrorRecord CreateProcessingTaskError(this Exception exception, object context) =>
new(exception, "ProcessingTask", ErrorCategory.NotSpecified, context);
internal static PSOutputData CreateProcessingTaskError(this Exception exception, object context) =>
PSOutputData.WriteError(new ErrorRecord(
exception, "ProcessingTask", ErrorCategory.NotSpecified, context));

private static bool ValueIsNotScriptBlock(object? value) =>
value is not ScriptBlock and not PSObject { BaseObject: ScriptBlock };
Expand Down
18 changes: 9 additions & 9 deletions src/PSParallelPipeline/PSOutputData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ internal record struct PSOutputData(Type Type, object Output)
internal static PSOutputData WriteObject(object sendToPipeline) =>
new(Type.Success, sendToPipeline);

internal static PSOutputData WriteError(ErrorRecord error) =>
internal static PSOutputData WriteError(object error) =>
new(Type.Error, error);

internal static PSOutputData WriteDebug(DebugRecord debug) =>
new(Type.Debug, debug.Message);
internal static PSOutputData WriteDebug(object debug) =>
new(Type.Debug, debug);

internal static PSOutputData WriteInformation(InformationRecord information) =>
internal static PSOutputData WriteInformation(object information) =>
new(Type.Information, information);

internal static PSOutputData WriteProgress(ProgressRecord progress) =>
internal static PSOutputData WriteProgress(object progress) =>
new(Type.Progress, progress);

internal static PSOutputData WriteVerbose(VerboseRecord verbose) =>
new(Type.Verbose, verbose.Message);
internal static PSOutputData WriteVerbose(object verbose) =>
new(Type.Verbose, verbose);

internal static PSOutputData WriteWarning(WarningRecord warning) =>
new(Type.Warning, warning.Message);
internal static PSOutputData WriteWarning(object warning) =>
new(Type.Warning, warning);
}
94 changes: 22 additions & 72 deletions src/PSParallelPipeline/PSOutputStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,91 +33,41 @@ internal PSOutputStreams(Worker worker)
SetStreamHandlers();
}

internal void AddOutput(PSOutputData data) => OutputPipe.Add(data);

private void SetStreamHandlers()
{
Success.DataAdded += (s, e) =>
{
foreach (PSObject data in Success.ReadAll())
{
WriteObject(data);
}
};

Error.DataAdded += (s, e) =>
{
foreach (ErrorRecord error in Error.ReadAll())
{
WriteError(error);
}
};

Debug.DataAdded += (s, e) =>
{
foreach (DebugRecord debug in Debug.ReadAll())
{
WriteDebug(debug);
}
};


Information.DataAdded += (s, e) =>
{
foreach (InformationRecord information in Information.ReadAll())
{
WriteInformation(information);
}
};

Progress.DataAdded += (s, e) =>
{
foreach (ProgressRecord progress in Progress.ReadAll())
{
WriteProgress(progress);
}
};

Verbose.DataAdded += (s, e) =>
{
foreach (VerboseRecord verbose in Verbose.ReadAll())
{
WriteVerbose(verbose);
}
};

Warning.DataAdded += (s, e) =>
{
foreach (WarningRecord warning in Warning.ReadAll())
{
WriteWarning(warning);
}
};
}
Success.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteObject(e.ItemAdded));

private void WriteObject(PSObject data) =>
OutputPipe.Add(PSOutputData.WriteObject(data), Token);
Error.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteError(e.ItemAdded));

internal void WriteError(ErrorRecord error) =>
OutputPipe.Add(PSOutputData.WriteError(error), Token);
Debug.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteDebug(e.ItemAdded));

private void WriteDebug(DebugRecord debug) =>
OutputPipe.Add(PSOutputData.WriteDebug(debug), Token);
Information.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteInformation(e.ItemAdded));

private void WriteInformation(InformationRecord information) =>
OutputPipe.Add(PSOutputData.WriteInformation(information), Token);
Progress.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteProgress(e.ItemAdded));

private void WriteProgress(ProgressRecord progress) =>
OutputPipe.Add(PSOutputData.WriteProgress(progress), Token);
Verbose.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteVerbose(e.ItemAdded));

private void WriteVerbose(VerboseRecord verbose) =>
OutputPipe.Add(PSOutputData.WriteVerbose(verbose), Token);

private void WriteWarning(WarningRecord warning) =>
OutputPipe.Add(PSOutputData.WriteWarning(warning), Token);
Warning.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteWarning(e.ItemAdded));
}

public void Dispose()
{
Success.Dispose();
Error.Dispose();
Debug.Dispose();
Information.Dispose();
Progress.Dispose();
Verbose.Dispose();
Warning.Dispose();
OutputPipe.Dispose();
GC.SuppressFinalize(this);
}
Expand Down
35 changes: 22 additions & 13 deletions src/PSParallelPipeline/PSTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ namespace PSParallelPipeline;

internal sealed class PSTask : IDisposable
{
private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; }

private readonly PowerShell _powershell;

private readonly PSDataStreams _internalStreams;

private readonly RunspacePool _pool;

private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; }

internal Runspace Runspace
{
get => _powershell.Runspace;
Expand Down Expand Up @@ -70,30 +70,39 @@ internal PSTask AddScript(ScriptBlock script)
return this;
}

internal PSTask AddUsingStatements(Dictionary<string, object?>? usingParams)
internal void AddUsingStatements(Dictionary<string, object?> usingParams)
{
if (usingParams is { Count: > 0 })
if (usingParams.Count > 0 )
{
_powershell.AddParameters(new Dictionary<string, Dictionary<string, object?>>
{
["--%"] = usingParams
});
}

return this;
}

internal async Task<PSTask> InvokeAsync()
internal async Task InvokeAsync()
{
using CancellationTokenRegistration _ = _pool.RegisterCancellation(CancelCallback(this));
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
return this;
try
{
using CancellationTokenRegistration _ = _pool.RegisterCancellation(CancelCallback(this));
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
}
catch (Exception exception)
{
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
}
finally
{
_pool.CompleteTask(this);
_pool.Release();
}
}

private static Action CancelCallback(PSTask task) => delegate
private static Action CancelCallback(PSTask psTask) => delegate
{
task.Dispose();
task.Runspace.Dispose();
psTask.Dispose();
psTask.Runspace.Dispose();
};

public void Dispose()
Expand Down
Loading

0 comments on commit 74d6ac3

Please sign in to comment.