Private/PayloadEvents.ps1
|
function Remove-PayloadEventsByCutoff { <# .SYNOPSIS Filters out already-sent events from hypervisor payloads. .DESCRIPTION Removes events with startTime <= CutoffUtc to prevent re-sending data that was already sent in previous runs. .PARAMETER Payloads Array of VSphereHypervisorPayload objects to process. .PARAMETER CutoffUtc Events with startTime <= this timestamp are filtered out. If null, no filtering. .OUTPUTS Array of VSphereHypervisorPayload objects with filtered events. #> [CmdletBinding()] param( [Parameter(Mandatory = $true)] [AllowEmptyCollection()] [AllowNull()] [VSphereHypervisorPayload[]]$Payloads, [Parameter(Mandatory = $false)] [Nullable[DateTime]]$CutoffUtc = $null ) if (-not $Payloads -or $Payloads.Count -eq 0) { return @() } $removedCount = 0 $result = [System.Collections.Generic.List[VSphereHypervisorPayload]]::new() foreach ($payload in $Payloads) { # Handle empty payloads - keep them as-is if (-not $payload.data -or $payload.data.Count -eq 0) { $result.Add($payload) continue } $newDataItems = [System.Collections.Generic.List[VSphereHypervisorDataItem]]::new() foreach ($dataItem in $payload.data) { # Handle empty data items - keep them as-is if (-not $dataItem -or -not $dataItem.events -or $dataItem.events.Count -eq 0) { if ($dataItem) { $newDataItems.Add($dataItem) } continue } $filteredEvents = [System.Collections.Generic.List[VSphereHypervisorEvent]]::new() foreach ($evt in $dataItem.events) { # Skip null or empty start_time if (-not $evt -or [string]::IsNullOrWhiteSpace($evt.start_time)) { continue } # Apply cutoff filter if ($null -ne $CutoffUtc) { $evtTime = ConvertFrom-RfcUtcTimestampOrNull -Value $evt.start_time if ($evtTime -and $evtTime -le $CutoffUtc) { $removedCount++ continue } } $filteredEvents.Add($evt) } if ($filteredEvents.Count -gt 0) { $newDataItem = [VSphereHypervisorDataItem]::new() $newDataItem.host = $dataItem.host $newDataItem.events = @($filteredEvents) $newDataItems.Add($newDataItem) } } if ($newDataItems.Count -gt 0) { $newPayload = [VSphereHypervisorPayload]::new() $newPayload.schema_version = $payload.schema_version $newPayload.source = $payload.source $newPayload.customer_environment = $payload.customer_environment $newPayload.version = $payload.version $newPayload.data = @($newDataItems) $result.Add($newPayload) } } if ($removedCount -gt 0) { Write-CustomLog -Message "Filtered out $removedCount already-sent events" -Severity 'INFO' } return $result } function Get-PayloadEventsFiltered { <# .SYNOPSIS Filters payloads to remove events that were already sent. .DESCRIPTION Uses the last_sent_utc watermark from state to filter out events that have already been sent in previous runs. .PARAMETER Payloads Array of VSphereHypervisorPayload objects to filter. .PARAMETER State The connector state containing the last_sent_utc watermark. .OUTPUTS Array of VSphereHypervisorPayload objects with already-sent events removed. #> [CmdletBinding()] param( [Parameter(Mandatory = $true)] [AllowEmptyCollection()] [AllowNull()] [VSphereHypervisorPayload[]]$Payloads, [Parameter(Mandatory = $false)] [AllowNull()] $State ) $cutoffUtc = $null if ($null -ne $State -and $null -ne $State.watermarks -and -not [string]::IsNullOrWhiteSpace($State.watermarks.last_sent_utc)) { $cutoffUtc = ConvertFrom-RfcUtcTimestampOrNull -Value $State.watermarks.last_sent_utc } return Remove-PayloadEventsByCutoff -Payloads $Payloads -CutoffUtc $cutoffUtc } |