script/tools/public/Invoke-CbsRunspace.ps1

Function Invoke-CbsRunspace {
    Param (
        [Parameter(Mandatory, ValueFromPipeline)][PSObject[]]$InputObject,
        [Parameter(Mandatory, Position = 0)][ScriptBlock]$ScriptBlock,
        [Parameter()][HashTable]$SharedVariables = @{},
        [Parameter()][Switch]$AutoImports,
        [Parameter()][int]$MaxThreads = 30,
        [Parameter()][System.Management.Automation.FunctionInfo[]]$Commands = @(),
        [Parameter()][System.Management.Automation.PSModuleInfo[]]$Modules = @()
    )

    Begin {
        Write-Verbose "Begin $($MyInvocation.InvocationName)"
        [System.Management.Automation.Runspaces.InitialSessionState]$sessionState =
            [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

        if ($AutoImports) {
            [string[]]$modulePaths = Get-Module |
                ? {
                    $path = $_.Path
                    $isOnModulePath = & { $env:PSModulePath -split ":" | % { if ($path -like "$_/*") { return $true }} }
                    if ($isOnModulePath) { return $false } else { return $true }
                } |
                Select-Object -ExpandProperty Path | % {
                    $manifest = $_ -replace "psm1$", "psd1"
                    if (Test-Path $manifest) {
                        $manifest
                    } else {
                        $_
                    }
                }

            foreach ($path in $ModulePaths) {
                Write-Verbose "Importing module $($path)"
                $sessionState.ImportPSModule($path)
            }
        }


        if ($Modules) {
            $Modules | Select-Object -ExpandProperty Path | % {
                $manifest = $_ -replace "psm1$", "psd1"
                if (Test-Path $manifest) {
                    $manifest
                }
                else {
                    $_
                }
            } | % {
                Write-Verbose "Importing module $_"
                $sessionState.ImportPSModule($_)
            }
        }

        $SharedVariables["Lock"] = New-Object System.Object

        $SharedVariables.Keys | ForEach-Object {
            $sessionState.Variables.Add(
                (New-Object System.Management.Automation.Runspaces.SessionStateVariableEntry($_, $SharedVariables.$_, $null))
            )
        }

        Function Invoke-WithLock {
            Param([Parameter(Mandatory)][ScriptBlock]$ScriptBlock)
            [System.Threading.Monitor]::Enter($Lock)
            & $ScriptBlock
            [System.Threading.Monitor]::Exit($Lock)
        }

        $lockFunction = Get-Content Function:\Invoke-WithLock
        $sessionFunction = [System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new("Invoke-WithLock", $lockFunction)
        $sessionState.Commands.Add($sessionFunction)

        # Import all unscoped functions into the runspace, this will let us pretend we are in the
        # same scope in terms of functions. This renders the "Commands" parameter somewhat redundant.
        (Get-ChildItem Function:\ | ? { !$_.Source }) + $Commands | % {
            Write-Verbose "Importing command $($_.Name)"
            $sessionState.Commands.Add(
                [System.Management.Automation.Runspaces.SessionStateFunctionEntry]::new($_.Name, $_.Definition)
            )
        }

        Write-Verbose "Creating runspace pool with [$maxThreads] runspaces"
        $runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads, $sessionState, $Host)
        $runspacePool.Open()

        $data = @{
            jobs = @()
            runningJobs = 0
        }

        Write-Verbose "Importing preferences into runspaces"
        # When called from a module a function will not have the preference variables set.
        # We can retrieve them from $PSCmdlet.SessionState
        $preferences = (Get-Variable | ? Name -like *Preference | % {
            $variable = $PSCmdlet.SessionState.PSVariable.Get($_.Name)
            if ($variable.Value.PSObject.TypeNames[0] -eq "System.Boolean") {
                '${0} = ${1}' -f $_.Name,$variable.Value
            } else {
                '${0} = "{1}"' -f $_.Name,$variable.Value
            }
        }) -join "`n"
        $parameterizedScriptBlock = [ScriptBlock]::Create("param(`$_)`n$preferences`n$ScriptBlock")
        # $parameterizedScriptBlock = [ScriptBlock]::Create("param(`$_)`n$ScriptBlock")

        Write-Verbose "Created script block:`n$($parameterizedScriptBlock.ToString())"

        $inBuffer = [System.Management.Automation.PSDataCollection[PSObject]]::new()
        $inBuffer.Complete()
        $outBuffer = [System.Management.Automation.PSDataCollection[PSObject]]::new()

        Function _ProcessJobs {
            # Remove all finished jobs from the jobs list, store the results and clean up.
            $data.jobs = @($data.jobs | % {
                if ($_.Handle.IsCompleted) {
                    # If the job crashes the thread object may be $null.
                    # Simply remove it from the list and keep going if this happens.
                    if ($_.Thread) {
                        $_.Thread.EndInvoke($_.Handle) | Out-Null
                        $_.Thread.Dispose()
                        $_.Thread = $Null
                        $_.Handle = $Null
                    }

                    $data.runningJobs--
                }
                else {
                    $_ | Write-Output
                }
            })

            # This outputs the result of all the completed jobs so far.
            if ($outBuffer.Count -gt 0) {
                $outBuffer.ReadAll() | Write-Output
            }

            Start-Sleep -Milliseconds 10
        }
    }

    Process {
        $InputObject | % {
            Write-Verbose "Process $_ $($MyInvocation.InvocationName)"
            $data.runningJobs++

            while($data.runningJobs -ge $MaxThreads) {
                _ProcessJobs
            }

            $powerShellThread = [powershell]::Create().AddScript($parameterizedScriptBlock, $True)

            [void]$powerShellThread.AddParameter("_", $_)

            $powerShellThread.RunspacePool = $runspacePool
            $handle = $powerShellThread.BeginInvoke($inBuffer, $outBuffer)

            $data.jobs += [PSCustomObject]@{
                Handle = $handle;
                Thread = $powerShellThread
            }
        }
    }

    End {
        try {
            Write-Verbose "End-Start $($MyInvocation.InvocationName)"
            # Wait for all threads to complete.


            while (($data.jobs | Measure-Object).Count -ne 0) {
                _ProcessJobs
            }
        } finally {
            if ($outBuffer.Count -gt 0) {
                $outBuffer.ReadAll() | Write-Output
            }

            # Clean up after ourselves.
            $runspacePool.Close()
            $runspacePool.Dispose()
            $inBuffer.Dispose()
            $outBuffer.Dispose()
            Write-Verbose "End-End $($MyInvocation.InvocationName)"
        }
    }
}