tool/Automation/functions/Invoke-GatherEmails.ps1
|
function Invoke-GatherEmails { <# .SYNOPSIS Scan a folder for .eml files and produce email-manifest.json. .DESCRIPTION Deterministic gather stage. Recursively scans the target folder for .eml and .emltpl files, parses each using Python's email module, and writes structured metadata to email-manifest.json. No AI calls. Fast, repeatable, free. .PARAMETER Path Path to folder containing .eml files. .PARAMETER OutputPath Path to data/ directory where email-manifest.json will be written. .OUTPUTS PSCustomObject with total_files, total_parsed, total_errors. #> [CmdletBinding()] param( [Parameter(Mandatory)] [string]$Path, [Parameter(Mandatory)] [string]$OutputPath ) # Find all email files recursively $emailFiles = Get-ChildItem -Path $Path -Recurse -File -Include '*.eml', '*.emltpl' if ($emailFiles.Count -eq 0) { throw "No .eml or .emltpl files found in: $Path" } # Resolve Python path (prefer venv) $pythonPath = $null $venvPython = Join-Path (Split-Path $PSScriptRoot -Parent | Split-Path -Parent | Split-Path -Parent) '.venv\Scripts\python.exe' if (Test-Path $venvPython) { $pythonPath = $venvPython } else { $pythonPath = (Get-Command python -ErrorAction SilentlyContinue)?.Source if (-not $pythonPath) { $pythonPath = (Get-Command python3 -ErrorAction SilentlyContinue)?.Source } } if (-not $pythonPath) { throw "Python not found. Ensure Python is available in PATH or a .venv exists." } # Parse emails using inline Python script — write incrementally to JSONL $jsonlPath = Join-Path $OutputPath 'email-manifest.jsonl' $errors = @() $parsedCount = 0 # Clear any existing JSONL from a previous partial run if (Test-Path $jsonlPath) { Remove-Item $jsonlPath -Force } foreach ($file in $emailFiles) { try { $relativePath = $file.FullName.Substring($Path.Length).TrimStart('\', '/') # Use Python to parse the .eml file and return JSON $pythonScript = @" import sys, json, email, email.utils, email.policy, io from pathlib import Path from datetime import datetime, timezone # Force UTF-8 output on Windows sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') def parse_email(file_path): with open(file_path, 'rb') as f: msg = email.message_from_binary_file(f, policy=email.policy.default) def parse_addr(addr_str): if not addr_str: return [] addresses = email.utils.getaddresses([addr_str]) return [{"name": name or "", "email": addr} for name, addr in addresses if addr] # Extract plain text body body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == 'text/plain': payload = part.get_content() if isinstance(payload, str): body = payload break if not body: for part in msg.walk(): if part.get_content_type() == 'text/html': payload = part.get_content() if isinstance(payload, str): body = payload break else: payload = msg.get_content() if isinstance(payload, str): body = payload # Extract attachments list attachments = [] if msg.is_multipart(): for part in msg.walk(): if part.get_content_disposition() == 'attachment': attachments.append({ "filename": part.get_filename() or "unnamed", "content_type": part.get_content_type(), "size_bytes": len(part.get_content() if hasattr(part.get_content(), '__len__') else b"") }) # Parse date date_str = msg.get('Date', '') iso_date = None if date_str: try: dt = email.utils.parsedate_to_datetime(date_str) iso_date = dt.isoformat() except: iso_date = date_str from_addrs = parse_addr(msg.get('From', '')) from_entry = from_addrs[0] if from_addrs else {"name": "", "email": ""} result = { "headers": { "subject": msg.get('Subject', ''), "date": iso_date, "from": from_entry, "to": parse_addr(msg.get('To', '')), "cc": parse_addr(msg.get('Cc', '')), "message_id": msg.get('Message-ID', ''), "in_reply_to": msg.get('In-Reply-To', ''), "references": msg.get('References', '') }, "body": { "plain_text": body, "word_count": len(body.split()) if body else 0, "language": "en" }, "attachments": attachments, "parse_error": None } print(json.dumps(result, ensure_ascii=False)) parse_email(r"$($file.FullName.Replace('\', '\\'))") "@ $result = & $pythonPath -c $pythonScript 2>&1 $stderr = ($result | Where-Object { $_ -is [System.Management.Automation.ErrorRecord] }) -join "`n" $stdout = ($result | Where-Object { $_ -isnot [System.Management.Automation.ErrorRecord] }) -join "`n" if ($stderr -and -not $stdout) { throw $stderr } $parsed = $stdout | ConvertFrom-Json $emailEntry = [ordered]@{ file_path = $relativePath file_name = $file.Name file_size_bytes = $file.Length parsed_at = (Get-Date).ToString('o') headers = $parsed.headers body = $parsed.body attachments = $parsed.attachments parse_error = $null } # Write incrementally to JSONL ($emailEntry | ConvertTo-Json -Depth 10 -Compress) | Add-Content -Path $jsonlPath -Encoding UTF8 $parsedCount++ } catch { $emailEntry = [ordered]@{ file_path = $relativePath file_name = $file.Name file_size_bytes = $file.Length parsed_at = (Get-Date).ToString('o') headers = $null body = $null attachments = @() parse_error = $_.Exception.Message } # Write error entries to JSONL too ($emailEntry | ConvertTo-Json -Depth 10 -Compress) | Add-Content -Path $jsonlPath -Encoding UTF8 $errors += [ordered]@{ file = $relativePath error = $_.Exception.Message } } } # Assemble final JSON from JSONL $emails = @() if (Test-Path $jsonlPath) { $emails = Get-Content $jsonlPath -Encoding UTF8 | Where-Object { $_.Trim() } | ForEach-Object { $_ | ConvertFrom-Json } } $manifest = [ordered]@{ metadata = [ordered]@{ generated_at = (Get-Date).ToString('o') source_folder = $Path total_files = $emailFiles.Count total_parsed = $parsedCount total_errors = $errors.Count } emails = $emails } $manifestPath = Join-Path $OutputPath 'email-manifest.json' $manifest | ConvertTo-Json -Depth 10 | Set-Content -Path $manifestPath -Encoding UTF8 # Clean up JSONL (final JSON is the source of truth) if (Test-Path $jsonlPath) { Remove-Item $jsonlPath -Force } # Return summary [PSCustomObject]@{ total_files = $emailFiles.Count total_parsed = $parsedCount total_errors = $errors.Count output_path = $manifestPath } } |