Invoke-ProducerConsumer.ps1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<#PSScriptInfo
.VERSION 1.0
.GUID bfb939b9-03f0-433e-ad0f-e4e12f4a009c
.AUTHOR Lee Holmes
#>


<#
.DESCRIPTION
 Example implementation of producer / consumer parallelism in PowerShell
#>


## The script block we want to run in parallel. Threads will all
## retrieve work from $InputQueue, and send results to $OutputQueue
$parallelScript = {
    param(
        ## An Input queue of work to do
        $InputQueue,
        
        ## The output buffer to write responses to
        $OutputQueue,
        
        ## State tracking, to help threads communicate
        ## how much progress they've made
        $OutputProgress, $ThreadId, $ShouldExit
    )

    ## Continually try to fetch work from the input queue, until
    ## the 'ShouldExit' flag is set
    $processed = 0
    $workItem = $null
    while(! $ShouldExit.Value)
    {
        if($InputQueue.TryDequeue([ref] $workItem))
        {
            ## If we got a work item, do something with it. In this
            ## situation, we just create a string and sleep a bit.
            $workItemResult = "Processing $workItem in thread $ThreadId"
            Start-Sleep -Seconds (Get-Random -Max 3)

            ## Add the result to the output queue
            $OutputQueue.Enqueue($workItemResult)

            ## Update our progress
            $processed++
            $OutputProgress[$ThreadId] = $processed
        }
        else
        {
            ## If there was no work, wait a bit for more.
            Start-Sleep -m 100
        }
    }
}

## Create a set of background PowerShell instances to do work, based on the
## number of available processors.
$threads = Get-WmiObject Win32_Processor | Foreach-Object NumberOfLogicalProcessors
$runspaces = 1..$threads | Foreach-Object { [PowerShell]::Create() }
$outputProgress = New-Object 'Int[]' $threads
$inputQueue = New-Object 'System.Collections.Concurrent.ConcurrentQueue[String]'
$outputQueue = New-Object 'System.Collections.Concurrent.ConcurrentQueue[String]'
$shouldExit = $false

## Spin up each of our PowerShell runspaces. Once invoked, these are actively
## waiting for work and consuming once available.
for($counter = 0; $counter -lt $threads; $counter++)
{   
    $null = $runspaces[$counter].AddScript($parallelScript).
        AddParameter("InputQueue", $inputQueue).
        AddParameter("OutputQueue", $outputQueue).
        AddParameter("OutputProgress", $outputProgress).
        AddParameter("ThreadId", $counter).
        AddParameter("ShouldExit", [ref] $shouldExit).BeginInvoke()
}

## Some fake work - send 50 GUIDs into our worker threads
$estimated = 50
1..$estimated | Foreach-Object {
    $currentInput = New-Guid
    $inputQueue.Enqueue($currentInput)
}

## Wait for our worker threads to complete processing the
## work.
try
{
    do
    {
        ## Update the status of how many items we've processed, based on adding up the
        ## output progress from each of the worker threads
        $totalProcessed = $outputProgress | Measure-Object -Sum | Foreach-Object Sum
        Write-Progress "Processed $totalProcessed of $estimated" -PercentComplete ($totalProcessed * 100 / $estimated)
   
        ## If there were any results, output them.
        $scriptOutput = $null
        while($outputQueue.TryDequeue([ref] $scriptOutput))
        {
            $scriptOutput
        }  

        ## If the threads are done processing the input we gave them, let them know they can exit
        if($inputQueue.Count -eq 0)
        {
            $shouldExit = $true
        }       
        
        Start-Sleep -m 100

        ## See if we still have any busy runspaces. If not, exit the loop.
        $busyRunspaces = $runspaces | Where-Object { $_.InvocationStateInfo.State -ne 'Complete' }
    } while($busyRunspaces)
}
finally
{
    ## Clean up our PowerShell instances
    foreach($runspace in $runspaces)
    {
        $runspace.Stop()
        $runspace.Dispose()
    }
}