Internal/ForEach-Parallel.ps1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
<#
.SYNOPSIS Parallel for-each, using PowerShell runspaces. .DESCRIPTION Parallel for-each, using PowerShell runspaces. .PARAMETER Items The items for which to execute the ScripBlock. .PARAMETER ScripBlock The script-block to execute for each item. .PARAMETER ArgumentList The arguments that will be passed to the scriptblock. .PARAMETER MaxRunspaces The maximum number of runspaces (to attempt) to run in parallel. The actual number of runspaces executing in parallel is determined by the runtime and is e.g. limited by available cores. Default is 16. .PARAMETER WaitTimeout The time to wait for each runspace to complete, in milliseconds. Default is 1 hour. #> function ForEach-Parallel { [Diagnostics.CodeAnalysis.SuppressMessageAttribute('PSUseApprovedVerbs', '', Justification = 'The used verb makes the most sense in this case.')] [CmdletBinding()] param ( [Parameter(Mandatory = $true, ValueFromPipeline = $true)] [Array]$Items, [Parameter(Mandatory = $true)] [scriptblock]$ScriptBlock, [Parameter(Mandatory = $false)] [Object[]]$ArgumentList, [Parameter(Mandatory = $false)] [int]$MaxRunspaces = 16, [Parameter(Mandatory = $false)] [int]$WaitTimeout = (60 * 60 * 1000) ) if ($Input) { try { # create the optional argument- & parameter-lists to be used in the script-block $arguments = '' $parameters = '' if ($ArgumentList) { for ($index = 0; $index -lt $ArgumentList.Length; $index++) { $arguments += ", `$$index" $parameters += " `$$index" } } # create the script-block to be executed # - the provided script-block is wrapped, so the provided arguments (ArgumentList) can be passed along with the current item ($_) # - the current module is always loaded $scriptText = @" [CmdletBinding()] param (`$_$arguments) function Wrapper { $ScriptBlock } Wrapper $parameters "@ $sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() # providing the current host makes the output of each runspace show up in the current host $pool = [runspacefactory]::CreateRunspacePool(1, $MaxRunspaces, $sessionState, $Host) $pool.Open() # create a new runspace for each item $runspaces = @() $asyncResults = @() $exceptions = @() foreach ($item in $Input) { $runspace = [powershell]::create() $runspace.RunSpacePool = $pool $runspaces += $runspace $runspace.Streams.Error.add_DataAdded({ Param ( [Object]$sender, [System.Management.Automation.DataAddedEventArgs]$e ) foreach ($item in $sender.ReadAll()) { throw "$($item.Exception.Message)" } }) # add the generated script-block, passing the current item and optional arguments [void]$runspace.AddScript($scriptText) [void]$runspace.AddArgument($item) if ($ArgumentList) { [void]$runspace.AddParameters($ArgumentList) } # pass the Verbose-parameter [void]$runspace.AddParameter('Verbose', $VerbosePreference -eq 'Continue') # start the runspace synchronously $asyncResult = $runspace.BeginInvoke() $asyncResults += $asyncResult } # wait for all runspaces to finish for ($index = 0; $index -lt $asyncResults.Length; $index++) { $null = [System.Threading.WaitHandle]::WaitAll($asyncResults[$index].AsyncWaitHandle, $WaitTimeout) } # retrieve the result of each runspace $errors = @() for ($index = 0; $index -lt $asyncResults.Length; $index++) { $asyncResult = $asyncResults[$index] $runspace = $runspaces[$index] # if needed, the following properties provide details of the runspace completion-status # $runspace.InvocationStateInfo.State # $runspace.InvocationStateInfo.Reason try { Write-Output ($runspace.EndInvoke($asyncResult)) } catch { # collect each error, so they can be provided as a single error $errors += $_ } } } finally { if ($pool) { $pool.Close() } } # handle the error(s) if ($errors) { if($errors.Length -eq 1) { throw $errors[0] } else { $exceptions = [exception[]]($errors).Exception throw (New-Object AggregateException -ArgumentList "One or more errors occurred:`n$([string]::Join("`n", $exceptions.Message))",$exceptions) } } } } |