Public/Import-M365ToKusto.ps1
|
function Import-M365ToKusto { <# .SYNOPSIS Ingests records into a Kusto table using inline CSV ingestion. .DESCRIPTION Accepts PSObjects (from Import-M365Parquet or directly from Get-M365* cmdlets), converts to CSV inline format, and ingests via Kusto management command. #> [CmdletBinding()] param( [Parameter(Mandatory)] [string]$AccessToken, [Parameter(Mandatory)] [string]$ClusterUri, [Parameter(Mandatory)] [string]$Database, [Parameter(Mandatory)] [string]$Table, [Parameter(Mandatory, ValueFromPipeline)] [PSCustomObject[]]$InputObject, [string[]]$DynamicColumns = @(), [int]$BatchSize = 500 ) begin { $allRecords = [System.Collections.Generic.List[PSCustomObject]]::new() } process { foreach ($obj in $InputObject) { $allRecords.Add($obj) } } end { if ($allRecords.Count -eq 0) { Write-M365Log "No records to ingest" -Level Warning return 0 } $columnMapping = $allRecords[0].PSObject.Properties.Name Write-M365Log "Ingesting $($allRecords.Count) records into $Table..." Write-M365Log " Cluster: $ClusterUri" Write-M365Log " Database: $Database" $headers = @{ 'Authorization' = "Bearer $AccessToken" 'Content-Type' = 'application/json; charset=utf-8' } $mgmtUri = "$ClusterUri/v1/rest/mgmt" $totalBatches = [System.Math]::Ceiling($allRecords.Count / $BatchSize) $totalIngested = 0 for ($i = 0; $i -lt $allRecords.Count; $i += $BatchSize) { $currentBatch = [System.Math]::Floor($i / $BatchSize) + 1 $batchRecords = $allRecords[$i..([System.Math]::Min($i + $BatchSize - 1, $allRecords.Count - 1))] Write-M365Log " Batch $currentBatch of $totalBatches ($($batchRecords.Count) records)..." $csvBuilder = [System.Text.StringBuilder]::new() $isFirstLine = $true foreach ($record in $batchRecords) { $values = [System.Collections.Generic.List[string]]::new($columnMapping.Count) foreach ($col in $columnMapping) { $val = $record.$col $isDynamic = $DynamicColumns -contains $col if ($null -eq $val -or $val -eq '') { [void]$values.Add('""') } elseif ($isDynamic) { if ($val -is [string]) { $escaped = $val.Replace('"', '""') [void]$values.Add("`"$escaped`"") } else { $json = ($val | ConvertTo-Json -Compress -Depth 10) $escaped = $json.Replace('"', '""') [void]$values.Add("`"$escaped`"") } } elseif ($val -is [bool]) { [void]$values.Add($val.ToString().ToLower()) } elseif ($val -is [datetime]) { [void]$values.Add("`"$($val.ToString('o'))`"") } elseif ($val -is [string]) { if ($val -eq 'Yes') { [void]$values.Add('true') } elseif ($val -eq 'No') { [void]$values.Add('false') } else { [void]$values.Add("`"$($val.Replace('"', '""'))`"") } } else { [void]$values.Add("`"$val`"") } } if ($isFirstLine) { $isFirstLine = $false } else { [void]$csvBuilder.Append("`n") } [void]$csvBuilder.Append(($values -join ',')) } $csvData = $csvBuilder.ToString() $csvBuilder = $null $ingestCommand = ".ingest inline into table $Table <|`n$csvData" $requestBody = @{ db = $Database csl = $ingestCommand } | ConvertTo-Json -Depth 5 try { Invoke-RestMethod -Uri $mgmtUri -Method Post -Headers $headers -Body $requestBody -ContentType 'application/json; charset=utf-8' | Out-Null $totalIngested += $batchRecords.Count Write-M365Log " Batch $currentBatch ingested successfully" } catch { Write-M365Log " Error ingesting batch ${currentBatch}: $_" -Level Error throw } finally { $batchRecords = $null $csvData = $null } } Write-M365Log "Successfully ingested $totalIngested records into $Table" return $totalIngested } } |