Skip to content

Commit

Permalink
Merge pull request #43 from santisq/42-add-a-more-verbose-module-desc…
Browse files Browse the repository at this point in the history
…ription

updates manifest description. bit of code refactoring
  • Loading branch information
santisq authored Jan 20, 2025
2 parents 74d6ac3 + dcdb5b4 commit 22b0f30
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 69 deletions.
10 changes: 6 additions & 4 deletions module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.2.0'
ModuleVersion = '1.2.1'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand All @@ -29,7 +29,7 @@
Copyright = '(c) Santiago Squarzon. All rights reserved.'

# Description of the functionality provided by this module
Description = 'Enables parallel processing of pipeline input objects.'
Description = 'Includes Invoke-Parallel cmdlet, allowing for parallel processing of input objects, sharing similar capabilities as ForEach-Object -Parallel.'

# Minimum version of the PowerShell engine required by this module
PowerShellVersion = '5.1'
Expand Down Expand Up @@ -94,11 +94,13 @@
# Tags applied to this module. These help with module discovery in online galleries.
Tags = @(
'parallel'
'concurrency'
'runspace'
'parallel-processing'
'powershell'
'multithreading'
'foreach'
'pipeline'
'threads'
'ForEach-Object'
)

# A URL to the license for this module.
Expand Down
2 changes: 1 addition & 1 deletion src/PSParallelPipeline/CommandCompleter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public IEnumerable<CompletionResult> CompleteArgument(
{
try
{
_builtinFuncs ??= new HashSet<string>(GetBuiltinFunctions());
_builtinFuncs ??= [.. GetBuiltinFunctions()];

return CompletionCompleters
.CompleteCommand(
Expand Down
22 changes: 8 additions & 14 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
using System.Collections;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
using PSParallelPipeline.Poly;

namespace PSParallelPipeline;
namespace PSParallelPipeline.Commands;

[Cmdlet(VerbsLifecycle.Invoke, "Parallel")]
[Alias("parallel")]
[OutputType(typeof(object))]
public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
{
private Worker? _worker;

[Parameter(Position = 0, Mandatory = true)]
public ScriptBlock ScriptBlock { get; set; } = null!;

Expand Down Expand Up @@ -41,21 +44,12 @@ public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
[Alias("unr")]
public SwitchParameter UseNewRunspace { get; set; }

private Worker? _worker;

protected override void BeginProcessing()
{
InitialSessionState iss = InitialSessionState.CreateDefault2();

if (Functions is not null)
{
iss.AddFunctions(Functions, this);
}

if (Variables is not null)
{
iss.AddVariables(Variables, this);
}
InitialSessionState iss = InitialSessionState
.CreateDefault2()
.AddFunctions(Functions, this)
.AddVariables(Variables, this);

PoolSettings poolSettings = new()
{
Expand Down
66 changes: 42 additions & 24 deletions src/PSParallelPipeline/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Management.Automation;
using System.Management.Automation.Language;
using System.Management.Automation.Runspaces;
Expand All @@ -10,52 +11,67 @@ namespace PSParallelPipeline;

internal static class Extensions
{
internal static void AddFunctions(
internal static InitialSessionState AddFunctions(
this InitialSessionState initialSessionState,
string[] functionsToAdd,
string[]? functionsToAdd,
PSCmdlet cmdlet)
{
foreach (string function in functionsToAdd)
if (functionsToAdd is not null)
{
CommandInfo? commandInfo = cmdlet
.InvokeCommand
.GetCommand(function, CommandTypes.Function);

if (commandInfo is null)
foreach (string function in functionsToAdd)
{
continue;
CommandInfo? commandInfo = cmdlet
.InvokeCommand
.GetCommand(function, CommandTypes.Function);

if (commandInfo is null)
{
continue;
}

initialSessionState.Commands.Add(new SessionStateFunctionEntry(
name: function,
definition: commandInfo.Definition));
}

initialSessionState.Commands.Add(new SessionStateFunctionEntry(
name: function,
definition: commandInfo.Definition));
}

return initialSessionState;
}

internal static void AddVariables(
internal static InitialSessionState AddVariables(
this InitialSessionState initialSessionState,
Hashtable variables,
Hashtable? variables,
PSCmdlet cmdlet)
{
foreach (DictionaryEntry pair in variables)
if (variables is not null)
{
cmdlet.ThrowIfVariableIsScriptBlock(pair.Value);
initialSessionState.Variables.Add(new SessionStateVariableEntry(
name: LanguagePrimitives.ConvertTo<string>(pair.Key),
value: pair.Value,
description: null));
foreach (DictionaryEntry pair in variables)
{
cmdlet.ThrowIfVariableIsScriptBlock(pair.Value);
initialSessionState.Variables.Add(new SessionStateVariableEntry(
name: LanguagePrimitives.ConvertTo<string>(pair.Key),
value: pair.Value,
description: null));
}
}

return initialSessionState;
}

internal static Dictionary<string, object?> GetUsingParameters(
this ScriptBlock script,
PSCmdlet cmdlet)
{
Dictionary<string, object?> usingParams = [];
IEnumerable<UsingExpressionAst> usingExpressionAsts = script.Ast
.FindAll((a) => a is UsingExpressionAst, true)
.Cast<UsingExpressionAst>();

foreach (UsingExpressionAst usingStatement in script.Ast.FindAll((a) => a is UsingExpressionAst, true))
foreach (UsingExpressionAst usingStatement in usingExpressionAsts)
{
VariableExpressionAst backingVariableAst = UsingExpressionAst.ExtractUsingVariable(usingStatement);
VariableExpressionAst backingVariableAst = UsingExpressionAst
.ExtractUsingVariable(usingStatement);

string varPath = backingVariableAst.VariablePath.UserPath;

string varText = usingStatement.ToString();
Expand Down Expand Up @@ -89,7 +105,9 @@ internal static void AddVariables(
object? value,
ExpressionAst ast)
{
VariableExpressionAst usingVariable = (VariableExpressionAst)ast.Find(a => a is VariableExpressionAst, false);
VariableExpressionAst usingVariable = (VariableExpressionAst)ast
.Find(a => a is VariableExpressionAst, false);

ExpressionAst lookupAst = new ConstantExpressionAst(ast.Extent, value);
Ast? currentAst = usingVariable;

Expand Down
2 changes: 0 additions & 2 deletions src/PSParallelPipeline/PSOutputData.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System.Management.Automation;

namespace PSParallelPipeline;

internal enum Type
Expand Down
2 changes: 0 additions & 2 deletions src/PSParallelPipeline/PSOutputStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ internal sealed class PSOutputStreams : IDisposable
{
private BlockingCollection<PSOutputData> OutputPipe { get => _worker.OutputPipe; }

private CancellationToken Token { get => _worker.Token; }

internal PSDataCollection<PSObject> Success { get; } = [];

internal PSDataCollection<ErrorRecord> Error { get; } = [];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;

namespace PSParallelPipeline;
namespace PSParallelPipeline.Poly;

internal static class Dbg
{
Expand Down
File renamed without changes.
42 changes: 21 additions & 21 deletions src/PSParallelPipeline/RunspacePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ internal RunspacePool(PoolSettings settings, Worker worker)

internal void Release() => _semaphore.Release();

private Runspace CreateRunspace()
{
Runspace rs = RunspaceFactory.CreateRunspace(InitialSessionState);
rs.Open();
return rs;
}

internal void CompleteTask(PSTask psTask)
{
psTask.Dispose();
Expand All @@ -62,17 +55,6 @@ internal void CompleteTask(PSTask psTask)
_pool.Enqueue(psTask.Runspace);
}

private async Task<Runspace> GetRunspaceAsync()
{
await _semaphore.WaitAsync(Token);
if (_pool.TryDequeue(out Runspace runspace))
{
return runspace;
}

return CreateRunspace();
}

internal async Task EnqueueAsync(PSTask psTask)
{
psTask.AddUsingStatements(UsingStatements);
Expand All @@ -88,17 +70,35 @@ internal async Task ProcessAllAsync()
}
}

internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
Token.Register(callback);

internal async Task WaitOnCancelAsync() => await Task.WhenAll(_tasks);

private async Task ProcessAnyAsync()
{
Task task = await Task.WhenAny(_tasks);
_tasks.Remove(task);
await task;
}

internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
Token.Register(callback);
private Runspace CreateRunspace()
{
Runspace rs = RunspaceFactory.CreateRunspace(InitialSessionState);
rs.Open();
return rs;
}

internal async Task WaitOnCancelAsync() => await Task.WhenAll(_tasks);
private async Task<Runspace> GetRunspaceAsync()
{
await _semaphore.WaitAsync(Token);
if (_pool.TryDequeue(out Runspace runspace))
{
return runspace;
}

return CreateRunspace();
}

public void Dispose()
{
Expand Down

0 comments on commit 22b0f30

Please sign in to comment.