Public/Import-M365ToKusto.ps1

function Import-M365ToKusto {
    <#
    .SYNOPSIS
        Ingests records into a Kusto table using inline CSV ingestion.
    .DESCRIPTION
        Accepts PSObjects (from Import-M365Parquet or directly from Get-M365* cmdlets),
        converts to CSV inline format, and ingests via Kusto management command.
    #>

    [CmdletBinding()]
    param(
        [Parameter(Mandatory)]
        [string]$AccessToken,

        [Parameter(Mandatory)]
        [string]$ClusterUri,

        [Parameter(Mandatory)]
        [string]$Database,

        [Parameter(Mandatory)]
        [string]$Table,

        [Parameter(Mandatory, ValueFromPipeline)]
        [PSCustomObject[]]$InputObject,

        [string[]]$DynamicColumns = @(),

        [int]$BatchSize = 500
    )

    begin {
        $allRecords = [System.Collections.Generic.List[PSCustomObject]]::new()
    }

    process {
        foreach ($obj in $InputObject) {
            $allRecords.Add($obj)
        }
    }

    end {
        if ($allRecords.Count -eq 0) {
            Write-M365Log "No records to ingest" -Level Warning
            return 0
        }

        $columnMapping = $allRecords[0].PSObject.Properties.Name

        Write-M365Log "Ingesting $($allRecords.Count) records into $Table..."
        Write-M365Log " Cluster: $ClusterUri"
        Write-M365Log " Database: $Database"

        $headers = @{
            'Authorization' = "Bearer $AccessToken"
            'Content-Type'  = 'application/json; charset=utf-8'
        }

        $mgmtUri = "$ClusterUri/v1/rest/mgmt"
        $totalBatches = [System.Math]::Ceiling($allRecords.Count / $BatchSize)
        $totalIngested = 0

        for ($i = 0; $i -lt $allRecords.Count; $i += $BatchSize) {
            $currentBatch = [System.Math]::Floor($i / $BatchSize) + 1
            $batchRecords = $allRecords[$i..([System.Math]::Min($i + $BatchSize - 1, $allRecords.Count - 1))]

            Write-M365Log " Batch $currentBatch of $totalBatches ($($batchRecords.Count) records)..."

            $csvBuilder = [System.Text.StringBuilder]::new()
            $isFirstLine = $true

            foreach ($record in $batchRecords) {
                $values = [System.Collections.Generic.List[string]]::new($columnMapping.Count)
                foreach ($col in $columnMapping) {
                    $val = $record.$col
                    $isDynamic = $DynamicColumns -contains $col

                    if ($null -eq $val -or $val -eq '') {
                        [void]$values.Add('""')
                    }
                    elseif ($isDynamic) {
                        if ($val -is [string]) {
                            $escaped = $val.Replace('"', '""')
                            [void]$values.Add("`"$escaped`"")
                        }
                        else {
                            $json = ($val | ConvertTo-Json -Compress -Depth 10)
                            $escaped = $json.Replace('"', '""')
                            [void]$values.Add("`"$escaped`"")
                        }
                    }
                    elseif ($val -is [bool]) {
                        [void]$values.Add($val.ToString().ToLower())
                    }
                    elseif ($val -is [datetime]) {
                        [void]$values.Add("`"$($val.ToString('o'))`"")
                    }
                    elseif ($val -is [string]) {
                        if ($val -eq 'Yes') {
                            [void]$values.Add('true')
                        }
                        elseif ($val -eq 'No') {
                            [void]$values.Add('false')
                        }
                        else {
                            [void]$values.Add("`"$($val.Replace('"', '""'))`"")
                        }
                    }
                    else {
                        [void]$values.Add("`"$val`"")
                    }
                }
                if ($isFirstLine) { $isFirstLine = $false } else { [void]$csvBuilder.Append("`n") }
                [void]$csvBuilder.Append(($values -join ','))
            }

            $csvData = $csvBuilder.ToString()
            $csvBuilder = $null
            $ingestCommand = ".ingest inline into table $Table <|`n$csvData"

            $requestBody = @{
                db  = $Database
                csl = $ingestCommand
            } | ConvertTo-Json -Depth 5

            try {
                Invoke-RestMethod -Uri $mgmtUri -Method Post -Headers $headers -Body $requestBody -ContentType 'application/json; charset=utf-8' | Out-Null
                $totalIngested += $batchRecords.Count
                Write-M365Log " Batch $currentBatch ingested successfully"
            }
            catch {
                Write-M365Log " Error ingesting batch ${currentBatch}: $_" -Level Error
                throw
            }
            finally {
                $batchRecords = $null
                $csvData = $null
            }
        }

        Write-M365Log "Successfully ingested $totalIngested records into $Table"
        return $totalIngested
    }
}