Providers/MySql/provider.ps1

Class MySqlProvider : ProviderBase {
    
    MySqlProvider([string]$ConnectionName
                , [int]$CommandTimeout
                , [MySql.Data.MySqlClient.MySqlConnection]$Connection) {

        $this.ConnectionName = $ConnectionName
        $this.CommandTimeout = $CommandTimeout
        $this.Connection = $Connection

        $messages = $this.Messages
        $handler = {Param($sender, [MySql.Data.MySqlClient.MySqlInfoMessageEventArgs]$e)
            $messages.Enqueue(([SqlMessage]@{Received=(Get-Date); Message=$e.Message}))
        }.GetNewClosure()

        $this.Connection.add_InfoMessage([MySql.Data.MySqlClient.MySqlInfoMessageEventHandler]$handler)
    }

    [string] ProviderType() { return "MySql" }
    
    [PSCustomObject] ConnectionInfo() {
        return [PSCustomObject]@{
            ConnectionName = $this.ConnectionName
            ProviderType = $this.ProviderType()
            ConnectionState = $this.Connection.State
            ConnectionString = $this.Connection.ConnectionString
            ServerVersion = $this.Connection.ServerVersion
            DataSource = $this.Connection.DataSource
            Database = $this.Connection.Database
            CommandTimeout = $this.CommandTimeout
            HasTransaction = $this.HasTransaction()
        }
    }

    [System.Data.DataSet] GetDataSet([System.Data.IDbCommand]$cmd) {
        $ds = [System.Data.DataSet]::new()
        $da = [MySql.Data.MySqlClient.MySqlDataAdapter]::new($cmd)
        Try {
            $da.Fill($ds)
            return $ds 
        }
        Catch { Throw $_ }
        Finally { $da.dispose() }
    }

    [void] ChangeDatabase([string]$DatabaseName) { $this.Connection.ChangeDatabase($DatabaseName) }

    [long] BulkLoad([System.Data.IDataReader]$DataReader
                    , [string]$DestinationTable
                    , [hashtable]$ColumnMap = @{}
                    , [int]$BatchSize
                    , [int]$BatchTimeout
                    , [ScriptBlock]$Notify) {

        $BatchSize -= $BatchSize % 10
        $SchemaMap = @()
        [long]$batchIteration = 0
        [int]$ord = 0
        $DataReader.GetSchemaTable().Rows | Sort-Object ColumnOrdinal | ForEach-Object { $SchemaMap += [PSCustomObject]@{Ordinal = $ord; SrcName = $_["ColumnName"]; DestName = $_["ColumnName"]}; $ord += 1}

        If($ColumnMap -and $ColumnMap.Count -gt 0) {
            $SchemaMap = $SchemaMap |
                Where-Object SrcName -In $ColumnMap.Keys |
                ForEach-Object { $_.DestName = $ColumnMap[$_.SrcName]; $_ }
        }

        [string[]]$DestNames = $SchemaMap | Select-Object -ExpandProperty DestName
        [string]$ValueSql = (1..10 | ForEach-Object { "(@Param" + (($SchemaMap | ForEach-Object Ordinal) -join ("_{0}, @Param" -f $_)) + ("_{0})" -f $_) }) -join ", "
        [string]$InsertSql = 'INSERT INTO {0} (`{1}`) VALUES {2}' -f $DestinationTable, ($DestNames -join '`, `'), $ValueSql

        $bulkCmd = $this.GetCommand($InsertSql, -1, @{})
        $bulkCmd.Prepare()
        Try {
            $bulkCmd.Transaction = $this.Connection.BeginTransaction()
            $sw = [System.Diagnostics.Stopwatch]::StartNew()
            While($DataReader.Read()) {                
                $batchIteration += 1
                $r = $batchIteration % 10
                If($r -eq 0) { $r = 10 }
                If($batchIteration -le 10) {
                    $SchemaMap.ForEach({
                        $p = 'Param{0}_{1}' -f $_.Ordinal, $r
                        $bulkCmd.Parameters.AddWithValue($p, $DataReader.GetValue($_.Ordinal))
                    })
                }
                Else {
                    $SchemaMap.ForEach({
                        $p = 'Param{0}_{1}' -f $_.Ordinal, $r
                        $bulkCmd.Parameters[$p].Value = $DataReader.GetValue($_.Ordinal)
                    })
                }

                If($r -eq 10) { $null = $bulkCmd.ExecuteNonQuery() }
                If($sw.Elapsed.TotalSeconds -gt $BatchTimeout) { Throw [System.TimeoutException]::new(("Batch took longer than {0} seconds to complete." -f $BatchTimeout)) }
                
                If($batchIteration % $BatchSize -eq 0) {
                    $bulkCmd.Transaction.Commit()
                    $bulkCmd.Transaction.Dispose()
                    If($Notify) { $Notify.Invoke($batchIteration) }
                    $bulkCmd.Transaction = $this.Connection.BeginTransaction()
                    $sw.Restart()
                }
            }
            $r = $batchIteration % 10
            If($r -eq 0) { $r = 10 }
            If($r -ne 10) {
                [string]$ValueSql = ((1..$r) | ForEach-Object { "(@Param" + (($SchemaMap | ForEach-Object Ordinal) -join ("_{0}, @Param" -f $_)) + ("_{0})" -f $_) }) -join ", "
                [string]$InsertSql = 'INSERT INTO {0} (`{1}`) VALUES {2}' -f $DestinationTable, ($DestNames -join '`, `'), $ValueSql
                $bulkCmd.CommandText = $InsertSql
                $bulkCmd.Prepare()
                [int]$mr = $r * $SchemaMap.Count
                While($bulkCmd.Parameters.Count -gt $mr){ $null = $bulkCmd.Parameters.RemoveAt($mr) }
                $null = $bulkCmd.ExecuteNonQuery()
            }

            $bulkCmd.Transaction.Commit()
            $bulkCmd.Transaction.Dispose()
            $bulkCmd.Transaction = $null
        }
        Finally {
            If($bulkCmd.Transaction) { $bulkCmd.Transaction.Dispose() }
            $bulkCmd.Dispose()
            $DataReader.Close()
            $DataReader.Dispose()
        }
        Return $batchIteration
    }
}