Internal/ForEach-Parallel.ps1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
<#
.SYNOPSIS
    Parallel for-each, using PowerShell runspaces.
.DESCRIPTION
    Parallel for-each, using PowerShell runspaces.
 
.PARAMETER Items
    The items for which to execute the ScripBlock.
.PARAMETER ScripBlock
    The script-block to execute for each item.
.PARAMETER ArgumentList
    The arguments that will be passed to the scriptblock.
.PARAMETER MaxRunspaces
    The maximum number of runspaces (to attempt) to run in parallel.
    The actual number of runspaces executing in parallel is determined by the runtime and is e.g. limited by available cores.
    Default is 16.
.PARAMETER WaitTimeout
    The time to wait for each runspace to complete, in milliseconds.
    Default is 1 hour.
#>

function ForEach-Parallel {
    [Diagnostics.CodeAnalysis.SuppressMessageAttribute('PSUseApprovedVerbs', '', Justification = 'The used verb makes the most sense in this case.')]
    [CmdletBinding()]
    param (
        [Parameter(Mandatory = $true, ValueFromPipeline = $true)]
        [Array]$Items,

        [Parameter(Mandatory = $true)]
        [scriptblock]$ScriptBlock,

        [Parameter(Mandatory = $false)]
        [Object[]]$ArgumentList,

        [Parameter(Mandatory = $false)]
        [int]$MaxRunspaces = 16,

        [Parameter(Mandatory = $false)]
        [int]$WaitTimeout = (60 * 60 * 1000)
    )

    if ($Input) {
        try {
            # create the optional argument- & parameter-lists to be used in the script-block
            $arguments = ''
            $parameters = ''
            if ($ArgumentList) {
                for ($index = 0; $index -lt $ArgumentList.Length; $index++) {
                    $arguments += ", `$$index"
                    $parameters += " `$$index"
                }
            }

            # create the script-block to be executed
            # - the provided script-block is wrapped, so the provided arguments (ArgumentList) can be passed along with the current item ($_)
            # - the current module is always loaded
            $scriptText = 
@"
[CmdletBinding()]
param (`$_$arguments)
 
function Wrapper {
$ScriptBlock
}
 
Wrapper $parameters
"@


            $sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

            # providing the current host makes the output of each runspace show up in the current host
            $pool = [runspacefactory]::CreateRunspacePool(1, $MaxRunspaces, $sessionState, $Host)
            $pool.Open()

            # create a new runspace for each item
            $runspaces = @()
            $asyncResults = @()
            $exceptions = @()
            foreach ($item in $Input) {
                $runspace = [powershell]::create()
                $runspace.RunSpacePool = $pool
                $runspaces += $runspace

                $runspace.Streams.Error.add_DataAdded({
                    Param (
                        [Object]$sender,
                        [System.Management.Automation.DataAddedEventArgs]$e
                    )

                    foreach ($item in $sender.ReadAll()) {
                        throw "$($item.Exception.Message)"
                    }
                })

                # add the generated script-block, passing the current item and optional arguments
                [void]$runspace.AddScript($scriptText)
                [void]$runspace.AddArgument($item)
                if ($ArgumentList) {
                    [void]$runspace.AddParameters($ArgumentList)
                }
                # pass the Verbose-parameter
                [void]$runspace.AddParameter('Verbose', $VerbosePreference -eq 'Continue')

                # start the runspace synchronously
                $asyncResult = $runspace.BeginInvoke()
                $asyncResults += $asyncResult
            }

            # wait for all runspaces to finish
            for ($index = 0; $index -lt $asyncResults.Length; $index++) {  
                $null = [System.Threading.WaitHandle]::WaitAll($asyncResults[$index].AsyncWaitHandle, $WaitTimeout)
            }

            # retrieve the result of each runspace
            $errors = @()
            for ($index = 0; $index -lt $asyncResults.Length; $index++) {  
                $asyncResult = $asyncResults[$index]
                $runspace = $runspaces[$index]

                # if needed, the following properties provide details of the runspace completion-status
                # $runspace.InvocationStateInfo.State
                # $runspace.InvocationStateInfo.Reason

                try {
                    Write-Output ($runspace.EndInvoke($asyncResult))
                }
                catch {
                    # collect each error, so they can be provided as a single error
                    $errors += $_
                }  
            }
        }
        finally {
            if ($pool) {
                $pool.Close()
            }
        }

        # handle the error(s)
        if ($errors) {
            if($errors.Length -eq 1) {
                throw $errors[0]
            }
            else {
                $exceptions = [exception[]]($errors).Exception
                throw (New-Object AggregateException -ArgumentList "One or more errors occurred:`n$([string]::Join("`n", $exceptions.Message))",$exceptions)
            }
        }
    }
}