PSParallelPipeline.psm1

using namespace System
using namespace System.Collections
using namespace System.Collections.Generic
using namespace System.Management.Automation
using namespace System.Management.Automation.Language
using namespace System.Diagnostics
using namespace System.Management.Automation.Host
using namespace System.Text
using namespace System.Threading
using namespace System.Management.Automation.Runspaces
#Region '.\private\CommandCompleter.ps1' 0
#using namespace System
#using namespace System.Collections
#using namespace System.Collections.Generic
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Language

class CommandCompleter : IArgumentCompleter {
    [IEnumerable[CompletionResult]] CompleteArgument(
        [string] $commandName,
        [string] $parameterName,
        [string] $wordToComplete,
        [CommandAst] $commandAst,
        [IDictionary] $fakeBoundParameters) {

        return [CompletionCompleters]::CompleteCommand(
            $wordToComplete,
            [NullString]::Value,
            [CommandTypes]::Function)
    }
}
#EndRegion '.\private\CommandCompleter.ps1' 21
#Region '.\private\InvocationManager.ps1' 0
#using namespace System.Collections.Generic
#using namespace System.Diagnostics
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Host
#using namespace System.Management.Automation.Language
#using namespace System.Text
#using namespace System.Threading

class InvocationManager : IDisposable {
    [int] $ThrottleLimit
    [PSHost] $PSHost
    [initialsessionstate] $InitialSessionState
    [Stack[runspace]] $Runspaces = [Stack[runspace]]::new()
    [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new()
    [bool] $UseNewRunspace
    hidden [int] $TotalMade

    InvocationManager(
        [int] $ThrottleLimit,
        [PSHost] $PSHost,
        [initialsessionstate] $InitialSessionState,
        [bool] $UseNewRunspace
    ) {
        $this.ThrottleLimit = $ThrottleLimit
        $this.PSHost = $PSHost
        $this.InitialSessionState = $InitialSessionState
        $this.UseNewRunspace = $UseNewRunspace
    }

    [runspace] TryGet() {
        if ($this.Runspaces.Count) {
            return $this.Runspaces.Pop()
        }

        if ($this.TotalMade -ge $this.ThrottleLimit) {
            return $null
        }

        $this.TotalMade++
        return $this.CreateRunspace()
    }

    [runspace] CreateRunspace() {
        $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState)
        $runspace.Open()
        return $runspace
    }

    [PSParallelTask] WaitAny() {
        if (-not $this.Tasks.Count) {
            return $null
        }

        do {
            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while ($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) {
        if (-not $this.Tasks.Count) {
            return $null
        }

        do {
            if ($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) {
                $this.Tasks[0].Stop()
                return $this.Tasks[0]
            }

            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while ($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [void] GetTaskResult([PSParallelTask] $Task) {
        try {
            $this.Tasks.Remove($Task)
            $this.Release($Task.GetRunspace())
            $Task.EndInvoke()
        }
        finally {
            if ($Task -is [IDisposable]) {
                $Task.Dispose()
            }
        }
    }

    [void] Release([runspace] $runspace) {
        if ($this.UseNewRunspace) {
            $runspace.Dispose()
            $runspace = $this.CreateRunspace()
        }

        $this.Runspaces.Push($runspace)
    }

    [void] AddTask([PSParallelTask] $Task) {
        $this.Tasks.Add($Task)
        $Task.Run()
    }

    [void] Dispose() {
        while ($runspace = $this.TryGet()) {
            $runspace.Dispose()
        }
    }

    static [hashtable] GetUsingStatements([scriptblock] $scriptblock, [PSCmdlet] $cmdlet) {
        $usingParams = @{}
        foreach ($usingstatement in $scriptblock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
            $variableAst = [UsingExpressionAst]::ExtractUsingVariable($usingstatement)
            $varPath = $variableAst.VariablePath.UserPath
            $varText = $usingstatement.ToString()

            if ($usingstatement.SubExpression -is [VariableExpressionAst]) {
                $varText = $varText.ToLowerInvariant()
            }

            $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText))

            if ($usingParams.ContainsKey($key)) {
                continue
            }

            $value = $cmdlet.SessionState.PSVariable.GetValue($varPath)

            if ($value -is [scriptblock]) {
                $cmdlet.ThrowTerminatingError([ErrorRecord]::new(
                    [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                    'VariableCannotBeScriptBlock',
                    [ErrorCategory]::InvalidType,
                    $value))
            }

            if ($usingstatement.SubExpression -isnot [VariableExpressionAst]) {
                [Stack[Ast]] $subexpressionStack = $usingstatement.SubExpression.FindAll({
                        $args[0] -is [IndexExpressionAst] -or
                        $args[0] -is [MemberExpressionAst] },
                    $false)

                while ($subexpressionStack.Count) {
                    $subexpression = $subexpressionStack.Pop()
                    if ($subexpression -is [IndexExpressionAst]) {
                        $idx = $subexpression.Index.SafeGetValue()
                        $value = $value[$idx]
                        continue
                    }

                    if ($subexpression -is [MemberExpressionAst]) {
                        $member = $subexpression.Member.SafeGetValue()
                        $value = $value.$member
                    }
                }
            }

            $usingParams.Add($key, $value)
        }

        return $usingParams
    }
}
#EndRegion '.\private\InvocationManager.ps1' 167
#Region '.\private\PSParallelTask.ps1' 0
#using namespace System.Management.Automation

class PSParallelTask : IDisposable {
    [powershell] $Instance
    [IAsyncResult] $AsyncResult
    [PSCmdlet] $Cmdlet

    PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) {
        # Thanks to Patrick Meinecke for his help here.
        # https://github.com/SeeminglyScience/
        $this.Cmdlet = $Cmdlet
        $this.Instance = [powershell]::Create().
            AddScript({
                param([scriptblock] $Action, [object] $Context)

                $Action.InvokeWithContext($null, [psvariable]::new('_', $Context))
            }).
            AddParameters(@{
                Action  = $Action.Ast.GetScriptBlock()
                Context = $PipelineObject
            })
    }

    [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) {
        if ($UsingStatements.Count) {
            # Credits to Jordan Borean for his help here.
            # https://github.com/jborean93
            $this.Instance.AddParameters(@{ '--%' = $UsingStatements })
        }
        return $this
    }

    [void] Run() {
        $this.AsyncResult = $this.Instance.BeginInvoke()
    }

    [void] EndInvoke() {
        try {
            $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true)
            $this.GetErrors()
        }
        catch [PipelineStoppedException] {
            $this.Cmdlet.WriteError($_)
        }
        catch {
            $this.Cmdlet.WriteError($_)
        }
    }

    [void] Stop() {
        $this.Instance.Stop()
    }

    [void] GetErrors() {
        if ($this.Instance.HadErrors) {
            foreach ($err in $this.Instance.Streams.Error) {
                $this.Cmdlet.WriteError($err)
            }
        }
    }

    [PSParallelTask] AssociateWith([runspace] $Runspace) {
        $this.Instance.Runspace = $Runspace
        return $this
    }

    [runspace] GetRunspace() {
        return $this.Instance.Runspace
    }

    [void] Dispose() {
        $this.Instance.Dispose()
    }
}
#EndRegion '.\private\PSParallelTask.ps1' 75
#Region '.\public\Invoke-Parallel.ps1' 0
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Runspaces

# .ExternalHelp PSParallelPipeline-help.xml
function Invoke-Parallel {
    [CmdletBinding(PositionalBinding = $false)]
    [Alias('parallel')]
    param(
        [Parameter(Mandatory, ValueFromPipeline)]
        [object] $InputObject,

        [Parameter(Mandatory, Position = 0)]
        [scriptblock] $ScriptBlock,

        [Parameter()]
        [ValidateRange(1, 63)]
        [int] $ThrottleLimit = 5,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [hashtable] $Variables,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [ArgumentCompleter([CommandCompleter])]
        [string[]] $Functions,

        [Parameter()]
        [switch] $UseNewRunspace,

        [Parameter()]
        [ValidateRange(1, [int]::MaxValue)]
        [int] $TimeoutSeconds
    )

    begin {
        $usingParams = [InvocationManager]::GetUsingStatements(
            $ScriptBlock,
            $PSCmdlet)

        try {
            $iss = [initialsessionstate]::CreateDefault2()

            foreach ($key in $Variables.PSBase.Keys) {
                if ($Variables[$key] -is [scriptblock]) {
                    $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
                        [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                        'VariableCannotBeScriptBlock',
                        [ErrorCategory]::InvalidType,
                        $Variables[$key]))
                }

                $iss.Variables.Add(
                    [SessionStateVariableEntry]::new($key, $Variables[$key], ''))
            }

            foreach ($function in $Functions) {
                $def = $PSCmdlet.InvokeCommand.GetCommand(
                    $function, [CommandTypes]::Function)

                $iss.Commands.Add(
                    [SessionStateFunctionEntry]::new($function, $def.Definition))
            }

            $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent)

            if ($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) {
                $timer = [Stopwatch]::StartNew()
            }
        }
        catch {
            $PSCmdlet.ThrowTerminatingError($_)
        }
    }

    process {
        try {
            do {
                if ($runspace = $im.TryGet()) {
                    continue
                }

                if ($withTimeout) {
                    if ($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                        $im.GetTaskResult($task)
                    }
                    continue
                }

                if ($task = $im.WaitAny()) {
                    $im.GetTaskResult($task)
                }
            }
            until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds)

            if ($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) {
                return
            }

            $im.AddTask([PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet).
                AssociateWith($runspace).
                AddUsingStatements($usingParams))
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
    }

    end {
        try {
            if ($withTimeout) {
                while ($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                    $im.GetTaskResult($task)
                }
                return
            }

            while ($task = $im.WaitAny()) {
                $im.GetTaskResult($task)
            }
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
        finally {
            if ($im -is [IDisposable]) {
                $im.Dispose()
            }
        }
    }
}
#EndRegion '.\public\Invoke-Parallel.ps1' 132