如何异步和在线程中执行大量的SQL查询

Nya*_*ova 17 sql parallel-processing powershell multithreading asynchronous

问题:我有大量的SQL查询(大约10k-20k),并且我想在50个(或更多)线程中异步运行它们。

我为此工作编写了一个powershell脚本,但是它非常慢(执行所有操作大约需要20个小时)。期望的结果是最多3-4小时。

问题:如何优化此Powershell脚本?我应该重新考虑和使用其他技术,如pythonc#

我认为这是powershell的问题,因为当我检查whoisactive查询时,它们执行得很快。创建,退出和卸载作业需要花费大量时间,因为为每个线程创建了单独的PS实例。

我的代码:

$NumberOfParallerThreads = 50;


$Arr_AllQueries = @('Exec [mystoredproc] @param1=1, @param2=2',
                    'Exec [mystoredproc] @param1=11, @param2=22',
                    'Exec [mystoredproc] @param1=111, @param2=222')

#Creating the batches
$counter = [pscustomobject] @{ Value = 0 };
$Batches_AllQueries = $Arr_AllQueries | Group-Object -Property { 
    [math]::Floor($counter.Value++ / $NumberOfParallerThreads) 
};

forEach ($item in $Batches_AllQueries) {
    $tmpBatch = $item.Group;

    $tmpBatch | % {

        $ScriptBlock = {
            # accept the loop variable across the job-context barrier
            param($query) 
            # Execute a command

            Try 
            {
                Write-Host "[processing '$query']"
                $objConnection = New-Object System.Data.SqlClient.SqlConnection;
                $objConnection.ConnectionString = 'Data Source=...';

                $ObjCmd = New-Object System.Data.SqlClient.SqlCommand;
                $ObjCmd.CommandText = $query;
                $ObjCmd.Connection = $objConnection;
                $ObjCmd.CommandTimeout = 0;

                $objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter;
                $objAdapter.SelectCommand = $ObjCmd;
                $objDataTable = New-Object System.Data.DataTable;
                $objAdapter.Fill($objDataTable)  | Out-Null;

                $objConnection.Close();
                $objConnection = $null;
            } 
            Catch 
            { 
                $ErrorMessage = $_.Exception.Message
                $FailedItem = $_.Exception.ItemName
                Write-Host "[Error processing: $($query)]" -BackgroundColor Red;
                Write-Host $ErrorMessage 
            }

        }

        # pass the loop variable across the job-context barrier
        Start-Job $ScriptBlock -ArgumentList $_ | Out-Null
    }

    # Wait for all to complete
    While (Get-Job -State "Running") { Start-Sleep 2 }

    # Display output from all jobs
    Get-Job | Receive-Job | Out-Null

    # Cleanup
    Remove-Job *

}
Run Code Online (Sandbox Code Playgroud)

更新

资源:数据库服务器位于具有以下内容的远程计算机上:

  • 24GB RAM,
  • 8核
  • 500GB存储空间,
  • SQL Server 2016

我们要使用最大的cpu功率。

框架限制:唯一的限制是使用SQL Server执行查询。请求应该来自外部资源,例如:Powershell,C#,Python等。

War*_*Dew 5

您需要重新组织您的脚本,以便在每个工作线程中保持一个数据库连接处于打开状态,并将其用于该线程执行的所有查询。现在您正在为每个查询打开一个新的数据库连接,这会增加大量开销。消除这种开销应该可以加快速度达到或超过您的目标。


ant*_*oni 5

RunspacePool是前往此处的方法,请尝试以下操作:

$AllQueries = @( ... )
$MaxThreads = 5

# Each thread keeps its own connection but shares the query queue
$ScriptBlock = {
    Param($WorkQueue)

    $objConnection = New-Object System.Data.SqlClient.SqlConnection
    $objConnection.ConnectionString = 'Data Source=...'

    $objCmd = New-Object System.Data.SqlClient.SqlCommand
    $objCmd.Connection = $objConnection
    $objCmd.CommandTimeout = 0

    $query = ""

    while ($WorkQueue.TryDequeue([ref]$query)) {
        $objCmd.CommandText = $query
        $objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter $objCmd
        $objDataTable = New-Object System.Data.DataTable
        $objAdapter.Fill($objDataTable) | Out-Null
    }

    $objConnection.Close()

}

# create a pool
$pool = [RunspaceFactory]::CreateRunspacePool(1, $MaxThreads)
$pool.ApartmentState  = 'STA'
$pool.Open()

# convert the query array into a concurrent queue
$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
$AllQueries | % { $workQueue.Enqueue($_) }

$threads = @()

# Create each powershell thread and add them to the pool
1..$MaxThreads | % {
    $ps = [powershell]::Create()
    $ps.RunspacePool = $pool
    $ps.AddScript($ScriptBlock) | Out-Null
    $ps.AddParameter('WorkQueue', $workQueue) | Out-Null
    $threads += [pscustomobject]@{
        Ps = $ps
        Handle = $null
    }
}

# Start all the threads
$threads | % { $_.Handle = $_.Ps.BeginInvoke() }

# Wait for all the threads to complete - errors will still set the IsCompleted flag
while ($threads | ? { !$_.Handle.IsCompleted }) {
    Start-Sleep -Seconds 1
}

# Get any results and display an errors
$threads | % {
    $_.Ps.EndInvoke($_.Handle) | Write-Output
    if ($_.Ps.HadErrors) {
        $_.Ps.Streams.Error.ReadAll() | Write-Error
    }
}
Run Code Online (Sandbox Code Playgroud)

与Powershell作业不同,RunspacePools可以共享资源。因此,所有查询都有一个并发队列,并且每个线程都保持自己与数据库的连接。

正如其他人所说的那样-除非您要对数据库进行压力测试,否则最好将查询重新组织为批量插入。