lib/Classes/Public/TMBroker.ps1
class TMBroker { #region Non-Static Properties [TMBrokerSetting]$Settings [TMTask]$Task [TMBrokerSubject]$Init [TMSession]$TMSession [TMBrokerEventData]$EventData [TMBrokerStatus]$BrokerStatus [Object]$Cache [System.Int64]$WorkflowTaskCount [System.Collections.Generic.List[System.Collections.Generic.List[TMBrokerSubject]]]$Subjects [System.Collections.Generic.List[TMBrokerSubject]]$ServiceSubjects #endregion Non-Static Properties #region Constructors # Initializes an empty TMBroker object with default settings TMBroker() { $this.Settings = [TMBrokerSetting]::new() $this.BrokerStatus = [TMBrokerStatus]::new() } TMBroker([String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria) { $this.Settings = [TMBrokerSetting]::new($_type, $_taskProperty, $_matchingCriteria) $this.BrokerStatus = [TMBrokerStatus]::new() } TMBroker( [String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria, [Int]$_timeout, [Int]$_pauseSeconds ) { $this.Settings = [TMBrokerSetting]::new($_type, $_taskProperty, $_matchingCriteria, $_timeout, $_pauseSeconds) $this.BrokerStatus = [TMBrokerStatus]::new() } TMBroker( [String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria, [Int]$_timeout, [Int]$_pauseSeconds, [bool]$_parallel, [int]$_throttle ) { $this.Settings = [TMBrokerSetting]::new($_type, $_taskProperty, $_matchingCriteria, $_timeout, $_pauseSeconds, $_parallel, $_throttle) $this.BrokerStatus = [TMBrokerStatus]::new() } #endregion Constructors #region Non-Static Methods <# Method: GetEventData Description: Loads the EventData property using this object's TMSession Parameters: None #> [void]GetEventData() { if (-not $this.TMSession) { throw 'A TM Session is required to invoke this method' } $this.EventData = [TMBrokerEventData]::new() $this.EventData.GetEventData( $this.TMSession.UserContext.project.id, $this.TMSession.UserContext.event.name, $this.TMSession.Name ) } <# Method: GetTaskData Description: Loads all of the broker-related tasks Parameters: TaskId - The Id of broker task #> [void]GetTaskData($TaskId) { if (-not $this.EventData) { throw 'Event data must be loaded before invoking this method' } # Store this Broker Task's data $this.Task = ($this.EventData.Tasks | Where-Object { $_.Id -eq $TaskId }) # Determine if there is an init cache Task $this.GetInitTask() # Get all of the subject tasks $this.GetSubjectTasks() # $this.GetSubjectTasksByTitle() } <# Method: GetInitTask Description: Gets the init task, if present, from the Event's task data Parameters: None #> [void]GetInitTask() { if ($this.Settings.SubjectScope.Type -eq 'Inline') { if (-not $this.Task) { throw 'Broker Task data must be loaded before invoking this method' } $InitTask = ( $this.EventData.Tasks | Where-Object { ($_.Id -in $this.Task.Successors.TaskId) -and ($_."$($this.Settings.SubjectScope.TaskProperty)" -match $this.Settings.SubjectScope.MatchRegexString) } ) if ($InitTask) { $this.Init = [TMBrokerSubject]::new($InitTask, ($this.EventData.Actions | Where-Object Id -EQ $InitTask.Action.Id), $null) } } } <# Method: GetSubjectTasks Description: Gets all of the subject tasks that will be managed by the broker from the Event's task data Parameters: None #> [void]GetSubjectTasks() { switch ($this.Settings.SubjectScope.Type) { 'Inline' { if (-not $this.Task -and -not $this.Init) { throw 'Broker Task or Init Task data must be loaded before invoking this method' } # Initialize the subjects list $this.Subjects = [System.Collections.Generic.List[System.Collections.Generic.List[TMBrokerSubject]]]::new() foreach ($TaskId in ($this.Init.Task.Successors.TaskId ?? $this.Task.Successors.TaskId)) { # Initialize a list to hold all of the subject tasks for a specific asset $Workflow = [System.Collections.Generic.List[TMBrokerSubject]]::new() # Find the first/direct successor subject task $SubjectTask = $this.EventData.Tasks | Where-Object { $_.Id -eq $TaskId -and ($_."$($this.Settings.SubjectScope.TaskProperty)" -match $this.Settings.SubjectScope.MatchRegexString) } $i = 0 while ($SubjectTask) { $i++ # Add the subject task data to the workflow $Workflow.Add([TMBrokerSubject]::new($SubjectTask, ($this.EventData.Actions | Where-Object Id -EQ $SubjectTask.Action.Id), $i)) # Look for the next subject task in the workflow $SubjectTask = $this.EventData.Tasks | Where-Object { $_.Id -eq $SubjectTask.Successors.TaskId -and ($_."$($this.Settings.SubjectScope.TaskProperty)" -match $this.Settings.SubjectScope.MatchRegexString) } } # Record how many tasks are in each asset's workflow $this.WorkflowTaskCount = $i # Add this workflow to the list of subjects $this.Subjects.Add($Workflow) } } 'Service' { # Initialize the subjects list $this.ServiceSubjects = [System.Collections.Generic.List[TMBrokerSubject]]::new() $ServiceSubjectTasks = $this.EventData.Tasks | Where-Object { ($_."$($this.Settings.SubjectScope.TaskProperty)" -match $this.Settings.SubjectScope.MatchRegexString) -and ($_.id -ne $Broker.task.id ) -and ($_.Action.Id -ne 0) -and ($_.Action.name -notlike '*broker*') } foreach ($Task in $ServiceSubjectTasks) { $this.ServiceSubjects.Add( [TMBrokerSubject]::new($Task, ($this.EventData.Actions | Where-Object Id -EQ $Task.Action.Id), $null) ) } } Default { Write-Host 'Total Service Subjects:'$this.ServiceSubjects.Count } } } <# Method: PopulateCache Description: Invokes the Init Task's Action to fill the cache Parameters: None #> [void]PopulateCache() { if (-not $this.Init) { throw 'Init Task data must be loaded before invoking this method' } $this.Init.Invoke($this.TMSession.Name) } <# Method: RefreshTaskStatuses Description: Updates each Task's status using fresh data from TM Parameters: None #> [void]RefreshTaskStatuses() { $TaskNumbers = [Array]@( $this.Subjects.Task.Number $this.ServiceSubjects.Task.Number $this.Task.TaskNumber $this.Init.Task.Number ) | Where-Object { $_ -gt 0 } $Statement = "find Task by 'taskNumber' ate '$($TaskNumbers -join '|')' fetch 'id', 'status'" $TaskStatuses = Invoke-TMQLStatement -TMSession $this.TMSession.Name -Statement $Statement $this.Task.Status = ($TaskStatuses | Where-Object Id -EQ $this.Task.Id).Status if ($this.Init) { $this.Init.Task.Status = ($TaskStatuses | Where-Object Id -EQ $this.Init.Id).Status } switch ($this.Settings.SubjectScope.Type) { 'Inline' { foreach ($Workflow in $this.Subjects) { foreach ($Subject in $Workflow) { $Subject.Task.Status = ($TaskStatuses | Where-Object Id -EQ $Subject.Task.Id).Status } } } 'Service' { foreach ($Subject in $this.ServiceSubjects) { $SubjectTaskItem = $TaskStatuses | Where-Object Id -EQ $Subject.Task.Id $Subject.Task.Status = $SubjectTaskItem.Status ## Review Task states to update throttling settings if ($this.Settings.Parallel) { ## Handle updating Subjet Task data based on the status of the task switch ($Subject.Task.Status) { 'Started' { ## Mark the Action as Started so the Broker ignores it for next time $Subject.Action.ExecutionStatus = 'Started' } 'Completed' { ## check to ensure the broker does not believe it's running it if ($this.BrokerStatus.ActiveSubjects -contains $Subject.task.Id) { [void]($this.BrokerStatus.ActiveSubjects.Remove($Subject.task.Id)) $Subject.Action.ExecutionStatus = 'Successful' } } } } } } } } [void]Run() { $TotalSubjectTasks = $this.Subjects.Task.Id.Count + $this.ServiceSubjects.Count Write-Progress -Id 1 -ParentId 0 -Activity 'Subject Tasks' -Status "$TotalSubjectTasks remaining tasks" -PercentComplete 0 Write-Progress -Id 2 -ParentId 0 -Activity 'Timeout' -Status "$($this.Settings.Timing.TimeoutMinutes) minutes left" -PercentComplete 0 $this.Settings.Timing.Timer.Start() $this.RefreshTaskStatuses() $TasksExecutedSinceRefresh = 0 while ( (($this.Settings.Timing.Timer.ElapsedMilliseconds / 60000) -lt $this.Settings.Timing.TimeoutMinutes) -and ( ($this.Subjects | Where-Object { $_.Action.ExecutionStatus -eq 'Pending' }) -or ($this.ServiceSubjects | Where-Object { $_.Action.ExecutionStatus -eq 'Pending' }) ) ) { ## Saftey Check the broker task status in TM, exit if the task status is not Started if ($this.Task.Status -ne 'Started') { throw 'The status of the Broker Task has changed outside of TMConsole' } ## Force a refresh after a few tasks being executed if ($TasksExecutedSinceRefresh -ge 3) { $this.RefreshTaskStatuses() $TasksExecutedSinceRefresh = 0 } else { $TasksExecutedSinceRefresh++ } switch ($this.Settings.SubjectScope.Type) { ## Inline Brokers run a workflow step worth of tasks at once 'Inline' { foreach ($Workflow in $this.Subjects) { $Subject = $Workflow | Where-Object { $_.Task.Status -ne 'Completed' -and $_.Action.ExecutionStatus -eq 'Pending' } | Sort-Object Order | Select-Object -First 1 if ($Subject) { $Subject.Invoke($this.TMSession.Name, $this.Cache) } } } ## Service Brokers run one task at a time, when they become ready 'Service' { ## Get the most preferred actionable subject $PreferredActionableSubject = $this.ServiceSubjects | Where-Object { $_.Task.Status -eq 'Ready' -and $_.Action.ExecutionStatus -eq 'Pending' } | Sort-Object { $_.Task."$($this.Settings.ExecutionOrder)" } | Select-Object -First 1 ## Invoke the Most Preferred, Actionable Subject if ($PreferredActionableSubject) { ## Update the local cache so this task won't run again until another refresh from TM $PreferredActionableSubject.Task.Status = 'Started' ## Run a Subject in a normal invocation runspace, but track that task so it 'consumes' one runspace if ($this.Settings.Parallel) { ## Honor Throttling settings if ($this.BrokerStatus.ActiveSubjects.Count -lt $this.Settings.Throttle) { ## Record the Task ID as belonging to this broker for Throttling [void]($this.BrokerStatus.ActiveSubjects.Add($PreferredActionableSubject.task.id)) Write-Verbose "Invoking Task $($PreferredActionableSubject.task.id)" $PreferredActionableSubject.InvokeParallel($this.TMSession.Name, $this.Cache) } } else { ## Invoke this ActionRequest directly, in this runspace $PreferredActionableSubject.Invoke($this.TMSession.Name, $this.Cache) } } } } ## Count the remaining tasks $RemainingTaskCount = ($this.Subjects | Where-Object { $_.Task.Status -ne 'Completed' -and $_.Action.ExecutionStatus -eq 'Pending' }).Count $RemainingTaskCount += ($this.ServiceSubjects | Where-Object { $_.Task.Status -ne 'Completed' -and $_.Action.ExecutionStatus -eq 'Pending' }).Count $ProgressSplat = @{ Id = 1 ParentId = 0 Activity = 'Subject Tasks' Status = "$RemainingTaskCount remaining tasks" PercentComplete = ((($TotalSubjectTasks - $RemainingTaskCount) / $TotalSubjectTasks) * 100) } Write-Progress @ProgressSplat $ProgressSplat = @{ Id = 2 ParentId = 0 Activity = 'Timeout' Status = "$($this.Settings.Timing.TimeoutMinutes) minutes left" PercentComplete = ((($this.Settings.Timing.Timer.ElapsedMilliseconds / 60000) / $this.Settings.Timing.TimeoutMinutes) * 100) } Write-Progress @ProgressSplat if ($this.Settings.Parallel) { $ProgressSplat = @{ Id = 3 ParentId = 0 Activity = 'Throttle' Status = "$($this.BrokerStatus.ActiveSubjects.Count) of $($this.Settings.Throttle)" PercentComplete = [Math]::Ceiling(((($this.BrokerStatus.ActiveSubjects.Count) / $this.Settings.Throttle) * 100)) } Write-Progress @ProgressSplat } ## Sleep, unless there are more tasks ready if ($this.ServiceSubjects.Task.Status -notcontains 'Ready') { Start-Sleep -Seconds $this.Settings.Timing.PauseSeconds ## Set the Tasks Executed since Refresh above the limit to force a refresh after the sleep $TasksExecutedSinceRefresh = 100 } } } #endregion Non-Static Methods } class TMBrokerEventData { [TMEvent]$Event [TMTask[]]$Tasks [PSCustomObject[]]$Actions TMBrokerEventData() {} TMBrokerEventData([Int]$ProjectId, [String]$EventName, [String]$TMSession) { $this.GetEventData($ProjectId, $EventName, $TMSession) } [void]GetEventData([Int]$ProjectId, [String]$EventName, [String]$TMSession) { # Get the Event object $this.Event = Get-TMEvent -TMSession $TMSession -ProjectId $ProjectId -Name $EventName # Get all of the broker-related Tasks in the Event $TaskSplat = @{ TMSession = $TMSession ProjectId = $ProjectId TMEvent = $this.Event Api = $false } $this.Tasks = Get-TMTask @TaskSplat # Get all of the Actions of the broker-related Tasks $this.Actions = Get-TMAction -TMSession $TMSession -Id ( $this.Tasks.Action.Id | Where-Object { $_ -gt 0 } | Sort-Object -Unique ) } } class TMBrokerSubjectScope { [ValidateSet('Service', 'Inline')] [String]$Type [ValidateSet('Title', 'Category', 'Team', 'Action')] [String]$TaskProperty [String[]]$MatchingCriteria hidden [String]$MatchRegexString static $ValidTypes = @('Service', 'Inline') static $ValidTaskProperties = @('Title', 'Category', 'Team') TMBrokerSubjectScope() { $this.Type = 'Inline' $this.TaskProperty = 'Title' $this.MatchingCriteria = @('\[Subject\]') $this.MatchRegexString = $this.GetMatchingString() } TMBrokerSubjectScope([String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria) { $this.Type = $_type $this.TaskProperty = $_taskProperty $this.MatchingCriteria = $_matchingCriteria $this.MatchRegexString = $this.GetMatchingString() } [String]GetMatchingString() { return ('(' + ($this.MatchingCriteria -join ')|(') + ')') } } class TMBrokerSetting { [TMBrokerSubjectScope]$SubjectScope [TMBrokerTiming]$Timing [ValidateSet('TaskNumber', 'Score')] [String]$ExecutionOrder [bool]$Parallel = $false [Int]$Throttle = 8 TMBrokerSetting() { $this.Timing = [TMBrokerTiming]::new() $this.SubjectScope = [TMBrokerSubjectScope]::new() $this.ExecutionOrder = 'TaskNumber' } TMBrokerSetting( [String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria, [Int]$_timeout, [Int]$_pauseSeconds ) { $this.SubjectScope = [TMBrokerSubjectScope]::new($_type, $_taskProperty, $_matchingCriteria) $this.Timing = [TMBrokerTiming]::new($_timeout, $_pauseSeconds) $this.ExecutionOrder = 'TaskNumber' } TMBrokerSetting( [String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria, [Int]$_timeout, [Int]$_pauseSeconds, [bool]$_parallel, [int]$_throttle ) { $this.SubjectScope = [TMBrokerSubjectScope]::new($_type, $_taskProperty, $_matchingCriteria) $this.Timing = [TMBrokerTiming]::new($_timeout, $_pauseSeconds) $this.ExecutionOrder = 'TaskNumber' $this.Parallel = $_parallel $this.Throttle = $_throttle } TMBrokerSetting( [String]$_type, [String]$_taskProperty, [String[]]$_matchingCriteria ) { $this.SubjectScope = [TMBrokerSubjectScope]::new($_type, $_taskProperty, $_matchingCriteria) $this.Timing = [TMBrokerTiming]::new() $this.ExecutionOrder = 'TaskNumber' } } class TMBrokerTiming { [System.Int64]$TimeoutMinutes [System.Int64]$PauseSeconds [System.Diagnostics.Stopwatch]$Timer TMBrokerTiming () { $this.TimeoutMinutes = 120 $this.PauseSeconds = 15 $this.Timer = [System.Diagnostics.Stopwatch]::new() } TMBrokerTiming ([System.Int64]$_timeoutMinutes, [System.Int64]$_pauseSeconds) { $this.TimeoutMinutes = $_timeoutMinutes $this.PauseSeconds = $_pauseSeconds $this.Timer = [System.Diagnostics.Stopwatch]::new() } } class TMBrokerStatus { [System.Collections.Generic.List[Int64]]$ActiveSubjects TMBrokerStatus () { $this.ActiveSubjects = [System.Collections.Generic.List[Int64]]::new() } } |