Public/duckdb/Add-RowsToDuckDB.ps1
|
# --------------------------------------------------------------------------- #region Apteco SqlPipeline-compatible Interface Layer # --------------------------------------------------------------------------- # These functions follow the Apteco SqlPipeline API (Add-RowsToSql) # and enable native PowerShell pipeline input (|) for DuckDB. # Compatible with: Import-Module SqlPipeline, SimplySql # --------------------------------------------------------------------------- function Add-RowsToDuckDB { <# .SYNOPSIS Inserts PSObjects directly into a DuckDB table via the PowerShell pipeline. Compatible with the Apteco SqlPipeline interface (Add-RowsToSql). .DESCRIPTION Buffers the pipeline objects internally and performs the actual write to DuckDB once the pipeline is complete (End block). Supports: - Automatic table creation - Schema evolution (new columns) - UPSERT (when PKColumns are specified) or plain INSERT - Transaction-like batching via -UseTransaction (staging) .PARAMETER InputObject PSObject from the pipeline. .PARAMETER Connection Open DuckDB connection. If omitted, the default in-memory connection is used. .PARAMETER TableName Target table in DuckDB. .PARAMETER PKColumns Primary key columns for UPSERT. Empty = plain INSERT. .PARAMETER UseTransaction Buffers all rows and writes them at the end via a staging table (safer, slightly slower). Without this flag: appender is used directly after the buffer is filled. .PARAMETER BatchSize Number of rows per staging batch (default: 10000). Only relevant without -UseTransaction. .EXAMPLE # Apteco style: pipeline input Import-Csv '.\orders.csv' | Add-RowsToDuckDB -TableName 'orders' -PKColumns 'order_id' -UseTransaction -Verbose .EXAMPLE # Pipe API data directly (explicit connection) (Invoke-RestMethod 'https://api.example.com/orders').items | Add-RowsToDuckDB -Connection $conn -TableName 'orders' -PKColumns @('order_id') #> [CmdletBinding()] param( [Parameter(Mandatory, ValueFromPipeline)] [PSObject]$InputObject, [Parameter(Mandatory=$false)] [DuckDB.NET.Data.DuckDBConnection]$Connection = $null, [Parameter(Mandatory)] [string]$TableName, [string[]]$PKColumns = @(), [switch]$UseTransaction, [int]$BatchSize = 10000 ) begin { if ($null -eq $Connection) { $Connection = $Script:DefaultConnection if ($null -eq $Connection) { throw "No active DuckDB connection. Provide -Connection or call Initialize-SQLPipeline first." } } $buffer = [System.Collections.Generic.List[PSObject]]::new() $rowCount = 0 Write-Verbose "[$TableName] Add-RowsToDuckDB started (UseTransaction=$UseTransaction, BatchSize=$BatchSize)" } process { $buffer.Add($InputObject) $rowCount++ # Without UseTransaction: write in batches once BatchSize is reached if (-not $UseTransaction -and $buffer.Count -ge $BatchSize) { Write-Verbose "[$TableName] Batch write: $($buffer.Count) rows" Invoke-BufferedWrite -Connection $Connection -TableName $TableName ` -Data $buffer -PKColumns $PKColumns $buffer.Clear() } } end { if ($buffer.Count -eq 0) { Write-Verbose "[$TableName] No data in pipeline." return } Write-Verbose "[$TableName] Final write: $($buffer.Count) rows (total: $rowCount)" Invoke-BufferedWrite -Connection $Connection -TableName $TableName ` -Data $buffer -PKColumns $PKColumns Write-Information "[$TableName] $rowCount rows inserted via pipeline." # Force DuckDB to flush changes to disk (important for in-memory connections or when using transactions) Invoke-DuckDBQuery -Query "FORCE CHECKPOINT" } } #endregion |