functions/Import-DbaCsv.ps1

function Import-DbaCsv {
    <#
    .SYNOPSIS
        Efficiently imports very large (and small) CSV files into SQL Server.
 
    .DESCRIPTION
        Import-DbaCsv takes advantage of .NET's super fast SqlBulkCopy class to import CSV files into SQL Server.
 
        The entire import is performed within a transaction, so if a failure occurs or the script is aborted, no changes will persist.
 
        If the table or view specified does not exist and -AutoCreateTable, it will be automatically created using slow and efficient but accomodating data types.
 
    .PARAMETER Path
        Specifies path to the CSV file(s) to be imported. Multiple files may be imported at once.
 
    .PARAMETER NoHeaderRow
        By default, the first row is used to determine column names for the data being imported.
 
        Use this switch if the first row contains data and not column names.
 
    .PARAMETER Delimiter
        Specifies the delimiter used in the imported file(s). If no delimiter is specified, comma is assumed.
 
        Valid delimiters are '`t`, '|', ';',' ' and ',' (tab, pipe, semicolon, space, and comma).
 
    .PARAMETER SingleColumn
        Specifies that the file contains a single column of data. Otherwise, the delimiter check bombs.
 
    .PARAMETER SqlInstance
        The SQL Server Instance to import data into.
 
    .PARAMETER SqlCredential
        Login to the target instance using alternative credentials. Windows and SQL Authentication supported. Accepts credential objects (Get-Credential)
 
    .PARAMETER Database
        Specifies the name of the database the CSV will be imported into. Options for this this parameter are auto-populated from the server.
 
    .PARAMETER Schema
        Specifies the schema in which the SQL table or view where CSV will be imported into resides. Default is dbo
 
        If a schema name is not specified, and a CSV name with multiple dots is specified (ie; something.data.csv) then this will be interpreted as a request to import into a table [data] in the schema [something].
 
        If a schema does not currently exist, it will be created, after a prompt to confirm this. Authorization will be set to dbo by default
 
    .PARAMETER Table
        Specifies the SQL table or view where CSV will be imported into.
 
        If a table name is not specified, the table name will be automatically determined from the filename.
 
        If the table specified does not exist and -AutoCreateTable, it will be automatically created using slow and efficient but accomodating data types.
 
        If the automatically generated table datatypes do not work for you, please create the table prior to import.
 
        If you want to import specific columns from a CSV, create a view with corresponding columns.
 
    .PARAMETER Column
        Import only specific columns. To remap column names, use the ColumnMap.
 
    .PARAMETER ColumnMap
        By default, the bulk copy tries to automap columns. When it doesn't work as desired, this parameter will help. Check out the examples for more information.
 
    .PARAMETER AutoCreateTable
        Creates a table if it does not already exist. The table will be created with sub-optimal data types such as nvarchar(max)
 
    .PARAMETER Truncate
        If this switch is enabled, the destination table will be truncated prior to import.
 
    .PARAMETER NotifyAfter
        Specifies the import row count interval for reporting progress. A notification will be shown after each group of this many rows has been imported.
 
    .PARAMETER BatchSize
        Specifies the batch size for the import. Defaults to 50000.
 
    .PARAMETER TableLock
        If this switch is enabled, the SqlBulkCopy option to acquire a table lock will be used. This is automatically used if -Turbo is enabled.
 
        Per Microsoft "Obtain a bulk update lock for the duration of the bulk copy operation. When not
        specified, row locks are used."
 
    .PARAMETER CheckConstraints
        If this switch is enabled, the SqlBulkCopy option to check constraints will be used.
 
        Per Microsoft "Check constraints while data is being inserted. By default, constraints are not checked."
 
    .PARAMETER FireTriggers
        If this switch is enabled, the SqlBulkCopy option to allow insert triggers to be executed will be used.
 
        Per Microsoft "When specified, cause the server to fire the insert triggers for the rows being inserted into the database."
 
    .PARAMETER KeepIdentity
        If this switch is enabled, the SqlBulkCopy option to keep identity values from the source will be used.
 
        Per Microsoft "Preserve source identity values. When not specified, identity values are assigned by the destination."
 
    .PARAMETER KeepNulls
        If this switch is enabled, the SqlBulkCopy option to keep NULL values in the table will be used.
 
        Per Microsoft "Preserve null values in the destination table regardless of the settings for default values. When not specified, null values are replaced by default values where applicable."
 
    .PARAMETER NoProgress
        The progress bar is pretty but can slow down imports. Use this parameter to quietly import.
 
    .PARAMETER WhatIf
        Shows what would happen if the command were to run. No actions are actually performed.
 
    .PARAMETER Confirm
        Prompts you for confirmation before executing any changing operations within the command.
 
    .PARAMETER EnableException
        By default, when something goes wrong we try to catch it, interpret it and give you a friendly warning message.
        This avoids overwhelming you with "sea of red" exceptions, but is inconvenient because it basically disables advanced scripting.
        Using this switch turns this "nice by default" feature off and enables you to catch exceptions with your own try/catch.
 
    .NOTES
        Tags: Migration, Import
        Author: Chrissy LeMaire (@cl), netnerds.net
 
        Website: https://dbatools.io
        Copyright: (c) 2018 by dbatools, licensed under MIT
        License: MIT https://opensource.org/licenses/MIT
 
    .LINK
        https://dbatools.io/Import-DbaCsv
 
    .EXAMPLE
        PS C:\> Import-DbaCsv -Path C:\temp\housing.csv -SqlInstance sql001 -Database markets
 
        Imports the entire comma-delimited housing.csv to the SQL "markets" database on a SQL Server named sql001, using the first row as column names.
 
        Since a table name was not specified, the table name is automatically determined from filename as "housing".
 
    .EXAMPLE
        PS C:\> Import-DbaCsv -Path .\housing.csv -SqlInstance sql001 -Database markets -Table housing -Delimiter "`t" -NoHeaderRow
 
        Imports the entire comma-delimited housing.csv, including the first row which is not used for colum names, to the SQL markets database, into the housing table, on a SQL Server named sql001.
 
    .EXAMPLE
        PS C:\> Import-DbaCsv -Path C:\temp\huge.txt -SqlInstance sqlcluster -Database locations -Table latitudes -Delimiter "|"
 
        Imports the entire pipe-delimited huge.txt to the locations database, into the latitudes table on a SQL Server named sqlcluster.
 
    .EXAMPLE
        PS C:\> Import-DbaCsv -Path c:\temp\SingleColumn.csv -SqlInstance sql001 -Database markets -Table TempTable -SingleColumn
 
        Imports the single column CSV into TempTable
 
    .EXAMPLE
        PS C:\> Get-ChildItem -Path \\FileServer\csvs | Import-DbaCsv -SqlInstance sql001, sql002 -Database tempdb -AutoCreateTable
 
        Imports every CSV in the \\FileServer\csvs path into both sql001 and sql002's tempdb database. Each CSV will be imported into an automatically determined table name.
 
    .EXAMPLE
        PS C:\> Get-ChildItem -Path \\FileServer\csvs | Import-DbaCsv -SqlInstance sql001, sql002 -Database tempdb -AutoCreateTable -WhatIf
 
        Shows what would happen if the command were to be executed
 
    .EXAMPLE
        PS C:\> Import-DbaCsv -Path c:\temp\dataset.csv -SqlInstance sql2016 -Database tempdb -Column Name, Address, Mobile
 
        Import only Name, Address and Mobile even if other columns exist. All other columns are ignored and therefore null or default values.
 
    .EXAMPLE
        PS C:\> $columns = @{
        >> Text = 'FirstName'
        >> Number = 'PhoneNumber'
        >> }
        PS C:\> Import-DbaCsv -Path c:\temp\supersmall.csv -SqlInstance sql2016 -Database tempdb -ColumnMap $columns
 
        The CSV column 'Text' is inserted into SQL column 'FirstName' and CSV column Number is inserted into the SQL Column 'PhoneNumber'. All other columns are ignored and therefore null or default values.
    #>

    [CmdletBinding(SupportsShouldProcess, ConfirmImpact = 'Low')]
    [Diagnostics.CodeAnalysis.SuppressMessageAttribute("PSUseDeclaredVarsMoreThanAssignments", "line", Justification = "Variable line is used, False Positive on line 330")]
    param (
        [parameter(ValueFromPipeline)]
        [ValidateNotNullOrEmpty()]
        [Alias("Csv", "FullPath")]
        [object[]]$Path,
        [Parameter(Mandatory)]
        [DbaInstanceParameter[]]$SqlInstance,
        [pscredential]$SqlCredential,
        [Parameter(Mandatory)]
        [string]$Database,
        [string]$Table,
        [string]$Schema = "dbo",
        [switch]$Truncate,
        [string]$Delimiter = ",",
        [switch]$SingleColumn,
        [int]$BatchSize = 50000,
        [int]$NotifyAfter = 50000,
        [switch]$TableLock,
        [switch]$CheckConstraints,
        [switch]$FireTriggers,
        [switch]$KeepIdentity,
        [switch]$KeepNulls,
        [string[]]$Column,
        [hashtable[]]$ColumnMap,
        [switch]$AutoCreateTable,
        [switch]$NoProgress,
        [switch]$NoHeaderRow,
        [switch]$EnableException
    )
    begin {
        $FirstRowHeader = $NoHeaderRow -eq $false
        $scriptelapsed = [System.Diagnostics.Stopwatch]::StartNew()

        try {
            # SilentContinue isn't enough
            Add-Type -Path "$script:PSModuleRoot\bin\csv\LumenWorks.Framework.IO.dll" -ErrorAction Stop
        } catch {
            $null = 1
        }

        function New-SqlTable {
            <#
                .SYNOPSIS
                    Creates new Table using existing SqlCommand.
 
                    SQL datatypes based on best guess of column data within the -ColumnText parameter.
                    Columns parameter determine column names.
 
                .EXAMPLE
                    New-SqlTable -Path $Path -Delimiter $Delimiter -Columns $columns -ColumnText $columntext -SqlConn $sqlconn -Transaction $transaction
 
                .OUTPUTS
                    Creates new table
            #>

            [Diagnostics.CodeAnalysis.SuppressMessageAttribute("PSUseShouldProcessForStateChangingFunctions", "")]
            param (
                [Parameter(Mandatory)]
                [string]$Path,
                [Parameter(Mandatory)]
                [string]$Delimiter,
                [Parameter(Mandatory)]
                [bool]$FirstRowHeader,
                [System.Data.SqlClient.SqlConnection]$sqlconn,
                [System.Data.SqlClient.SqlTransaction]$transaction
            )
            $reader = New-Object LumenWorks.Framework.IO.Csv.CsvReader((New-Object System.IO.StreamReader($Path)), $FirstRowHeader, $Delimiter, 1)
            $columns = $reader.GetFieldHeaders()
            $reader.Close()
            $reader.Dispose()

            # Get SQL datatypes by best guess on first data row
            $sqldatatypes = @(); $index = 0

            foreach ($column in $Columns) {
                $sqldatatypes += "[$column] varchar(MAX)"
            }

            $sql = "BEGIN CREATE TABLE [$schema].[$table] ($($sqldatatypes -join ' NULL,')) END"
            $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)

            try {
                $null = $sqlcmd.ExecuteNonQuery()
            } catch {
                $errormessage = $_.Exception.Message.ToString()
                Stop-Function -Continue -Message "Failed to execute $sql. `nDid you specify the proper delimiter? `n$errormessage"
            }

            Write-Message -Level Verbose -Message "Successfully created table $schema.$table with the following column definitions:`n $($sqldatatypes -join "`n ")"
            # Write-Message -Level Warning -Message "All columns are created using a best guess, and use their maximum datatype."
            Write-Message -Level Verbose -Message "This is inefficient but allows the script to import without issues."
            Write-Message -Level Verbose -Message "Consider creating the table first using best practices if the data will be used in production."
        }

        Write-Message -Level Verbose -Message "Started at $(Get-Date)"

        # Getting the total rows copied is a challenge. Use SqlBulkCopyExtension.
        # http://stackoverflow.com/questions/1188384/sqlbulkcopy-row-count-when-complete

        $sourcecode = 'namespace System.Data.SqlClient
        {
            using Reflection;
 
            public static class SqlBulkCopyExtension
            {
                const String _rowsCopiedFieldName = "_rowsCopied";
                static FieldInfo _rowsCopiedField = null;
 
                public static int RowsCopiedCount(this SqlBulkCopy bulkCopy)
                {
                    if (_rowsCopiedField == null) _rowsCopiedField = typeof(SqlBulkCopy).GetField(_rowsCopiedFieldName, BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
                    return (int)_rowsCopiedField.GetValue(bulkCopy);
                }
            }
        }
    '

        if (-not $script:core) {
            try {
                Add-Type -ReferencedAssemblies System.Data.dll -TypeDefinition $sourcecode -ErrorAction Stop
            } catch {
                $null = 1
            }
        }
    }
    process {
        foreach ($filename in $Path) {

            if ($filename.FullName) {
                $filename = $filename.FullName
            }

            if (-not (Test-Path -Path $filename)) {
                Stop-Function -Continue -Message "$filename cannot be found"
            }

            $file = (Resolve-Path -Path $filename).ProviderPath

            # Does the second line contain the specified delimiter?
            try {
                $firstline = Get-Content -Path $file -TotalCount 2 -ErrorAction Stop
            } catch {
                Stop-Function -Continue -Message "Failure reading $file" -ErrorRecord $_
            }
            if (-not $SingleColumn) {
                if ($firstline -notmatch $Delimiter) {
                    Stop-Function -Message "Delimiter ($Delimiter) not found in first few rows of $file. If this is a single column import, please specify -SingleColumn"
                    return
                }
            }

            # Automatically generate Table name if not specified, then prompt user to confirm
            if (-not (Test-Bound -ParameterName Table)) {
                $table = [IO.Path]::GetFileNameWithoutExtension($file)
                Write-Message -Level Verbose -Message "Table name not specified, using $table"
            }

            foreach ($instance in $SqlInstance) {
                $elapsed = [System.Diagnostics.Stopwatch]::StartNew()
                # Open Connection to SQL Server
                try {
                    $server = Connect-SqlInstance -SqlInstance $instance -SqlCredential $sqlcredential -StatementTimeout 0 -MinimumVersion 9
                    $sqlconn = $server.ConnectionContext.SqlConnectionObject
                    $sqlconn.Open()
                } catch {
                    Stop-Function -Message "Failure" -Category ConnectionError -ErrorRecord $_ -Target $instance -Continue
                }

                if ($PSCmdlet.ShouldProcess($instance, "Starting transaction in $Database")) {
                    # Everything will be contained within 1 transaction, even creating a new table if required
                    # and truncating the table, if specified.
                    $transaction = $sqlconn.BeginTransaction()
                }

                # Ensure database exists
                $sql = "select count(*) from master.dbo.sysdatabases where name = '$Database'"
                $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)
                if (($sqlcmd.ExecuteScalar()) -eq 0) {
                    Stop-Function -Continue -Message "Database does not exist on $instance"
                }
                Write-Message -Level Verbose -Message "Database exists"
                $sqlconn.ChangeDatabase($Database)

                # Ensure Schema exists
                $sql = "select count(*) from $Database.sys.schemas where name='$schema'"
                $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)

                # If Schema doesn't exist create it
                # Defaulting to dbo.
                if (($sqlcmd.ExecuteScalar()) -eq 0) {
                    if (-not $AutoCreateTable) {
                        Stop-Function -Continue -Message "Schema $Schema does not exist and AutoCreateTable was not specified"
                    }
                    $sql = "CREATE SCHEMA [$schema] AUTHORIZATION dbo"
                    if ($PSCmdlet.ShouldProcess($instance, "Creating schema $schema")) {
                        $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)
                        try {
                            $null = $sqlcmd.ExecuteNonQuery()
                        } catch {
                            Stop-Function -Continue -Message "Could not create $schema" -ErrorRecord $_
                        }
                    }
                }

                # Ensure table or view exists
                $sql = "select count(*) from $Database.sys.tables where name = '$table' and schema_id=schema_id('$schema')"
                $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)

                $sql2 = "select count(*) from $Database.sys.views where name = '$table' and schema_id=schema_id('$schema')"
                $sqlcmd2 = New-Object System.Data.SqlClient.SqlCommand($sql2, $sqlconn, $transaction)

                # Create the table if required. Remember, this will occur within a transaction, so if the script fails, the
                # new table will no longer exist.
                if (($sqlcmd.ExecuteScalar()) -eq 0 -and ($sqlcmd2.ExecuteScalar()) -eq 0) {
                    if (-not $AutoCreateTable) {
                        Stop-Function -Continue -Message "Table or view $table does not exist and AutoCreateTable was not specified"
                    }
                    Write-Message -Level Verbose -Message "Table does not exist"
                    if ($PSCmdlet.ShouldProcess($instance, "Creating table $table")) {
                        New-SqlTable -Path $file -Delimiter $Delimiter -FirstRowHeader $FirstRowHeader -SqlConn $sqlconn -Transaction $transaction
                    }
                } else {
                    Write-Message -Level Verbose -Message "Table exists"
                }

                # Truncate if specified. Remember, this will occur within a transaction, so if the script fails, the
                # truncate will not be committed.
                if ($Truncate) {
                    $sql = "TRUNCATE TABLE [$schema].[$table]"
                    if ($PSCmdlet.ShouldProcess($instance, "Performing TRUNCATE TABLE [$schema].[$table] on $Database")) {
                        $sqlcmd = New-Object System.Data.SqlClient.SqlCommand($sql, $sqlconn, $transaction)
                        try {
                            $null = $sqlcmd.ExecuteNonQuery()
                        } catch {
                            Stop-Function -Continue -Message "Could not truncate $schema.$table" -ErrorRecord $_
                        }
                    }
                }

                # Setup bulk copy
                Write-Message -Level Verbose -Message "Starting bulk copy for $(Split-Path $file -Leaf)"

                # Setup bulk copy options
                $bulkCopyOptions = @()
                $options = "TableLock", "CheckConstraints", "FireTriggers", "KeepIdentity", "KeepNulls", "Default"
                foreach ($option in $options) {
                    $optionValue = Get-Variable $option -ValueOnly -ErrorAction SilentlyContinue
                    if ($optionValue -eq $true) {
                        $bulkCopyOptions += "$option"
                    }
                }
                $bulkCopyOptions = $bulkCopyOptions -join " & "

                if ($PSCmdlet.ShouldProcess($instance, "Performing import from $file")) {
                    try {
                        # Create SqlBulkCopy using default options, or options specified in command line.
                        if ($bulkCopyOptions) {
                            $bulkcopy = New-Object Data.SqlClient.SqlBulkCopy($oleconnstring, $bulkCopyOptions, $transaction)
                        } else {
                            $bulkcopy = New-Object Data.SqlClient.SqlBulkCopy($sqlconn, "Default", $transaction)
                        }

                        $bulkcopy.DestinationTableName = "[$schema].[$table]"
                        $bulkcopy.BulkCopyTimeout = 0
                        $bulkCopy.BatchSize = $BatchSize
                        $bulkCopy.NotifyAfter = $NotifyAfter
                        $bulkCopy.EnableStreaming = $true

                        if ($ColumnMap) {
                            foreach ($columnname in $ColumnMap) {
                                foreach ($key in $columnname.Keys) {
                                    $null = $bulkcopy.ColumnMappings.Add($key, $columnname[$key])
                                }
                            }
                        }

                        if ($Column) {
                            foreach ($columnname in $Column) {
                                $null = $bulkcopy.ColumnMappings.Add($columnname, $columnname)
                            }
                        }
                    } catch {
                        Stop-Function -Continue -Message "Failure" -ErrorRecord $_
                    }

                    # Write to server :D
                    try {
                        # Open the text file from disk
                        # // or using (CsvReader csv = new CsvReader(File.OpenRead(path), false, Encoding.UTF8, addMark))
                        # When addMark is true, consecutive null bytes will be replaced by [removed x null bytes] to indicate the removal
                        $reader = New-Object LumenWorks.Framework.IO.Csv.CsvReader((New-Object System.IO.StreamReader($file)), $FirstRowHeader, $Delimiter, 1)

                        # Add rowcount output
                        $bulkCopy.Add_SqlRowsCopied( {
                                $script:totalrows = $args[1].RowsCopied
                                if (-not $NoProgress) {
                                    $timetaken = [math]::Round($elapsed.Elapsed.TotalSeconds, 2)
                                    Write-ProgressHelper -StepNumber 1 -TotalSteps 2 -Activity "Importing from $file" -Message ([System.String]::Format("Progress: {0} rows in {2} seconds", $script:totalrows, $percent, $timetaken)) -ExcludePercent
                                }
                            })

                        $bulkCopy.WriteToServer($reader)

                        if ($resultcount -is [int]) {
                            Write-Progress -id 1 -activity "Inserting $resultcount rows" -status "Complete" -Completed
                        }

                        $reader.Close()
                        $reader.Dispose()
                        $completed = $true
                    } catch {
                        $completed = $false

                        if ($resultcount -is [int]) {
                            Write-Progress -id 1 -activity "Inserting $resultcount rows" -status "Failed" -Completed
                        }
                        Stop-Function -Continue -Message "Failure" -ErrorRecord $_
                    }
                }
                if ($PSCmdlet.ShouldProcess($instance, "Committing transaction")) {
                    if ($completed) {
                        # "Note: This count does not take into consideration the number of rows actually inserted when Ignore Duplicates is set to ON."
                        $null = $transaction.Commit()
                        if ($script:core) {
                            $rowscopied = "Unsupported in Core"
                        } else {
                            $rowscopied = [System.Data.SqlClient.SqlBulkCopyExtension]::RowsCopiedCount($bulkcopy)
                        }

                        Write-Message -Level Verbose -Message "$rowscopied total rows copied"

                        [pscustomobject]@{
                            ComputerName = $server.ComputerName
                            InstanceName = $server.ServiceName
                            SqlInstance  = $server.DomainInstanceName
                            Database     = $Database
                            Table        = $table
                            Schema       = $schema
                            RowsCopied   = $rowscopied
                            Elapsed      = [prettytimespan]$elapsed.Elapsed
                            Path         = $file
                        }
                    } else {
                        Stop-Function -Message "Transaction rolled back. Was the proper delimiter specified? Is the first row the column name?" -ErrorRecord $_
                        return
                    }
                }

                # Close everything just in case & ignore errors
                try {
                    $null = $sqlconn.close(); $null = $sqlconn.Dispose();
                    $null = $bulkCopy.close(); $bulkcopy.dispose();
                    $null = $reader.close(); $null = $reader.dispose()
                } catch {
                    #here to avoid an empty catch
                    $null = 1
                }
            }
        }
    }
    end {
        # Close everything just in case & ignore errors
        try {
            $null = $sqlconn.close(); $null = $sqlconn.Dispose();
            $null = $bulkCopy.close(); $bulkcopy.dispose();
            $null = $reader.close(); $null = $reader.dispose()
        } catch {
            #here to avoid an empty catch
            $null = 1
        }

        # Script is finished. Show elapsed time.
        $totaltime = [math]::Round($scriptelapsed.Elapsed.TotalSeconds, 2)
        Write-Message -Level Verbose -Message "Total Elapsed Time for everything: $totaltime seconds"

        Test-DbaDeprecation -DeprecatedOn "1.0.0" -Alias Import-DbaCsvtoSql
        Test-DbaDeprecation -DeprecatedOn "1.0.0" -EnableException:$false -Alias Import-CsvToSql
    }
}