Private/Invoke-EventhouseIngest.ps1

function Invoke-EventhouseIngest {
    <#
    .SYNOPSIS
        Stream a single record (or an array of records) to an Eventhouse table.
    .DESCRIPTION
        Internal helper. Uses the Kusto streaming ingest REST endpoint with the
        JSON ingestion mapping configured in 03-ingestion-mappings.kql.
        Retries on transient failures with exponential backoff. Falls back to
        the disk spool when the cluster is unreachable.
    #>

    [CmdletBinding()]
    param(
        [Parameter(Mandatory)]
        [string] $TableName,
        [Parameter(Mandatory)]
        [string] $MappingName,
        [Parameter(Mandatory)]
        [object[]] $Records,
        [int] $MaxRetries = 3
    )

    if (-not $script:FDAState.EventhouseIngestUri) {
        throw 'Eventhouse ingest URI is not set. Call Initialize-FDAObservability and Connect-FDAObservability.'
    }

    # Serialize as multi-line JSON (JSONL).
    $body = ($Records | ForEach-Object {
        $_ | ConvertTo-Json -Compress -Depth 100
    }) -join "`n"

    $database = [uri]::EscapeDataString($script:FDAState.DatabaseName)
    $table = [uri]::EscapeDataString($TableName)
    $mapping = [uri]::EscapeDataString($MappingName)
    $url = '{0}/v1/rest/ingest/{1}/{2}?streamFormat=multijson&mappingName={3}' -f `
        $script:FDAState.EventhouseIngestUri.TrimEnd('/'), $database, $table, $mapping

    $attempt = 0
    $delaySec = 1
    while ($true) {
        $attempt++
        try {
            $token = Get-FDAAccessToken -Scope 'https://kusto.fabric.microsoft.com/.default'
            $headers = @{
                Authorization       = "Bearer $token"
                'Content-Type'      = 'application/json; charset=utf-8'
                'x-ms-client-version' = 'PSFabricDataAgentObservability/1.0.0'
                'x-ms-client-request-id' = [guid]::NewGuid().ToString()
            }
            $resp = Invoke-RestMethod -Method Post -Uri $url -Headers $headers -Body $body -ErrorAction Stop
            return $resp
        } catch {
            $sc = Get-FDAHttpStatusCode -ErrorRecord $_
            $isTransient = ($sc -in 408, 429, 500, 502, 503, 504) -or
                           ($_.Exception -is [System.Net.WebException]) -or
                           ($_.Exception.Message -match 'transient|timeout|temporarily')
            if ($attempt -ge $MaxRetries -or -not $isTransient) {
                # Spool to disk so we don't lose the events.
                try { Save-FDASpool -TableName $TableName -MappingName $MappingName -Records $Records } catch { Write-Verbose "Spool fallback failed: $($_.Exception.Message)" }
                throw "Eventhouse ingest failed after $attempt attempts: $($_.Exception.Message)"
            }
            Start-Sleep -Seconds $delaySec
            $delaySec = [Math]::Min($delaySec * 2, 30)
        }
    }
}

function Save-FDASpool {
    [CmdletBinding()]
    param(
        [Parameter(Mandatory)] [string] $TableName,
        [Parameter(Mandatory)] [string] $MappingName,
        [Parameter(Mandatory)] [object[]] $Records
    )
    $spoolDir = $script:FDAState.SpoolPath
    if (-not (Test-Path $spoolDir)) {
        New-Item -ItemType Directory -Path $spoolDir -Force | Out-Null
    }
    $fileName = '{0}_{1}_{2}.spool.json' -f `
        $TableName, $MappingName, [guid]::NewGuid().ToString()
    $path = Join-Path $spoolDir $fileName
    @{
        TableName   = $TableName
        MappingName = $MappingName
        Records     = $Records
        SpooledAt   = (Get-Date).ToUniversalTime().ToString('o')
    } | ConvertTo-Json -Depth 100 | Set-Content -Path $path -Encoding UTF8
    Write-Verbose "Spooled $($Records.Count) records to $path"
}

function Restore-FDASpool {
    <#
    .SYNOPSIS
        Drain the local spool directory after connectivity is restored.
    #>

    [CmdletBinding()]
    param()
    $spoolDir = $script:FDAState.SpoolPath
    if (-not (Test-Path $spoolDir)) { return }
    $files = Get-ChildItem -Path $spoolDir -Filter '*.spool.json' -File -ErrorAction SilentlyContinue
    foreach ($f in $files) {
        try {
            $payload = Get-Content -Path $f.FullName -Raw | ConvertFrom-Json -Depth 100
            Invoke-EventhouseIngest -TableName $payload.TableName -MappingName $payload.MappingName -Records $payload.Records | Out-Null
            Remove-Item -Path $f.FullName -Force -ErrorAction SilentlyContinue
            Write-Verbose "Drained spool file $($f.Name)"
        } catch {
            Write-Warning "Could not drain spool file $($f.Name): $($_.Exception.Message)"
        }
    }
}