Nya*_*ova 17 sql parallel-processing powershell multithreading asynchronous
问题:我有大量的SQL查询(大约10k-20k),并且我想在50个(或更多)线程中异步运行它们。
我为此工作编写了一个powershell脚本,但是它非常慢(执行所有操作大约需要20个小时)。期望的结果是最多3-4小时。
问题:如何优化此Powershell脚本?我应该重新考虑和使用其他技术,如python
或c#
?
我认为这是powershell的问题,因为当我检查whoisactive
查询时,它们执行得很快。创建,退出和卸载作业需要花费大量时间,因为为每个线程创建了单独的PS实例。
我的代码:
$NumberOfParallerThreads = 50;
$Arr_AllQueries = @('Exec [mystoredproc] @param1=1, @param2=2',
'Exec [mystoredproc] @param1=11, @param2=22',
'Exec [mystoredproc] @param1=111, @param2=222')
#Creating the batches
$counter = [pscustomobject] @{ Value = 0 };
$Batches_AllQueries = $Arr_AllQueries | Group-Object -Property {
[math]::Floor($counter.Value++ / $NumberOfParallerThreads)
};
forEach ($item in $Batches_AllQueries) {
$tmpBatch = $item.Group;
$tmpBatch | % {
$ScriptBlock = {
# accept the loop variable across the job-context barrier
param($query)
# Execute a command
Try
{
Write-Host "[processing '$query']"
$objConnection = New-Object System.Data.SqlClient.SqlConnection;
$objConnection.ConnectionString = 'Data Source=...';
$ObjCmd = New-Object System.Data.SqlClient.SqlCommand;
$ObjCmd.CommandText = $query;
$ObjCmd.Connection = $objConnection;
$ObjCmd.CommandTimeout = 0;
$objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter;
$objAdapter.SelectCommand = $ObjCmd;
$objDataTable = New-Object System.Data.DataTable;
$objAdapter.Fill($objDataTable) | Out-Null;
$objConnection.Close();
$objConnection = $null;
}
Catch
{
$ErrorMessage = $_.Exception.Message
$FailedItem = $_.Exception.ItemName
Write-Host "[Error processing: $($query)]" -BackgroundColor Red;
Write-Host $ErrorMessage
}
}
# pass the loop variable across the job-context barrier
Start-Job $ScriptBlock -ArgumentList $_ | Out-Null
}
# Wait for all to complete
While (Get-Job -State "Running") { Start-Sleep 2 }
# Display output from all jobs
Get-Job | Receive-Job | Out-Null
# Cleanup
Remove-Job *
}
Run Code Online (Sandbox Code Playgroud)
更新:
资源:数据库服务器位于具有以下内容的远程计算机上:
我们要使用最大的cpu功率。
框架限制:唯一的限制是不使用SQL Server执行查询。请求应该来自外部资源,例如:Powershell,C#,Python等。
您需要重新组织您的脚本,以便在每个工作线程中保持一个数据库连接处于打开状态,并将其用于该线程执行的所有查询。现在您正在为每个查询打开一个新的数据库连接,这会增加大量开销。消除这种开销应该可以加快速度达到或超过您的目标。
RunspacePool是前往此处的方法,请尝试以下操作:
$AllQueries = @( ... )
$MaxThreads = 5
# Each thread keeps its own connection but shares the query queue
$ScriptBlock = {
Param($WorkQueue)
$objConnection = New-Object System.Data.SqlClient.SqlConnection
$objConnection.ConnectionString = 'Data Source=...'
$objCmd = New-Object System.Data.SqlClient.SqlCommand
$objCmd.Connection = $objConnection
$objCmd.CommandTimeout = 0
$query = ""
while ($WorkQueue.TryDequeue([ref]$query)) {
$objCmd.CommandText = $query
$objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter $objCmd
$objDataTable = New-Object System.Data.DataTable
$objAdapter.Fill($objDataTable) | Out-Null
}
$objConnection.Close()
}
# create a pool
$pool = [RunspaceFactory]::CreateRunspacePool(1, $MaxThreads)
$pool.ApartmentState = 'STA'
$pool.Open()
# convert the query array into a concurrent queue
$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
$AllQueries | % { $workQueue.Enqueue($_) }
$threads = @()
# Create each powershell thread and add them to the pool
1..$MaxThreads | % {
$ps = [powershell]::Create()
$ps.RunspacePool = $pool
$ps.AddScript($ScriptBlock) | Out-Null
$ps.AddParameter('WorkQueue', $workQueue) | Out-Null
$threads += [pscustomobject]@{
Ps = $ps
Handle = $null
}
}
# Start all the threads
$threads | % { $_.Handle = $_.Ps.BeginInvoke() }
# Wait for all the threads to complete - errors will still set the IsCompleted flag
while ($threads | ? { !$_.Handle.IsCompleted }) {
Start-Sleep -Seconds 1
}
# Get any results and display an errors
$threads | % {
$_.Ps.EndInvoke($_.Handle) | Write-Output
if ($_.Ps.HadErrors) {
$_.Ps.Streams.Error.ReadAll() | Write-Error
}
}
Run Code Online (Sandbox Code Playgroud)
与Powershell作业不同,RunspacePools可以共享资源。因此,所有查询都有一个并发队列,并且每个线程都保持自己与数据库的连接。
正如其他人所说的那样-除非您要对数据库进行压力测试,否则最好将查询重新组织为批量插入。