我如何才能创建在完成时产生结果的工作?

Nic*_*ugh 5 powershell multithreading

问题

考虑你有 4 台机器。

  • 机器A很慢
  • 机器B是中速,
  • 机器C很快。
  • LocalHost 是超快的。

在每台远程机器上,您想对前 100 万个素数求和。您可以从本地主机执行此操作:

$servers = @("MachineA","MachineB","MachineC")
Invoke-Command -ComputerName $servers -ScriptBlock {
    Sum-FirstMillionPrimes
}
Run Code Online (Sandbox Code Playgroud)

正如所写的那样,在最慢的机器完成之前,不会显示(产生)结果。

为了加快速度,您尝试将其作为一项工作来执行:

$servers = @("MachineA","MachineB","MachineC")
Invoke-Command -ComputerName $servers -ScriptBlock {
    Sum-FirstMillionPrimes
} -AsJob

while ($null -ne (Get-Job)) {
    $doneChildJob = Get-Job | Wait-Job -Any
    $processResult = $doneChildJob | Receive-Job -AutoRemoveJob -Wait
    $processResult
}
Run Code Online (Sandbox Code Playgroud)

这仍然有同样的问题,因为根据文档(示例 8)

The command uses the AsJob parameter to run the command as a background
job. This command returns a job object that contains two child job
objects, one for each of the jobs run on the two remote computers.
Run Code Online (Sandbox Code Playgroud)

这对我们来说意味着我们正在运行三个子作业,但在所有子作业完成之前父作业不会返回。

你怎么能这样写,子作业的结果将在完成时返回?

我试过的

我们提出了一个似乎有效的解决方案,但这个问题似乎很常见,应该有一种 PowerShell 方法来处理这个问题。

# Create a HashSet of jobs that have already been processed. This is important
# because child jobs cannot be removed via Remove-Job. There doesn't seem to be
# a way to determine if the job has been received
[System.Collections.Generic.HashSet[int]]$processedJobIds = @()
while ($null -ne (Get-Job)) {
    # We only want to attempt to process jobs that have no children that we
    # haven't seen. The -IncludeChildJob parameter allows us to see the nested
    # children jobs from Invoke-Command -AsJob. Because we can't determine if a
    # child job has already been received, we filter based on our above hashset.
    $doneChildJob = Get-Job -IncludeChildJob | Where-Object { $_.ChildJobs.Count -eq 0 -and (-not ($processedJobIds.Contains($_.Id))) } | Wait-Job -Any
    if ($null -eq $doneChildJob) {
        #   The $doneChildJob filter will exclude the parent job created by
        # Invoke-Command -AsJob. However, we still need to eventually remove
        # this job, otherwise we'd hit an infinite loop.
        #   The assumption is that the only way that $doneChildJob will evaluate to
        # $null is if all child jobs have completed. If all child jobs are
        # completed, the remaining job(s) should be safe to remove as they are
        # expected to be parent jobs.
        Get-Job | Remove-Job
    }
    else {
        # We need to process the child jobs
        $processResult = $doneChildJob | Receive-Job -Wait
        $processResult
        $processedJobIds.Add($doneChildJob.Id) | Out-Null
        # By default, Get-Job does not return children jobs (i.e they are
        # parents and can be removed by Remove-Job). Based on this behavior,
        # if $processedJobIds contains any of these jobs, they are safe to
        # remove, and should also be removed from our $processedJobIds list.
        Get-Job | Where-Object { $processedJobIds.Contains($_.Id) } | ForEach-Object {
            $processedJobIds.Remove($_.Id) | Out-Null
            Remove-Job $_
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

鉴于以下代码,我们已经使用这些示例运行了它,并且它似乎可以工作:

Import-Module ThreadJob

$servers = @("MachineA", "MachineB", "MachineC")
$sessions = New-PSSession -ComputerName $servers

Invoke-Command -Session $sessions -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    $firstMillionPrimes = Sum-FirstMillionPrimes
    Write-Output "$computerName - $firstMillionPrimes"
} -AsJob | Out-Null

# It should also handle when one of the child jobs fails but not all
Invoke-Command -ComputerName $servers -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    if ($computerName -eq "MachineA") {
        Throw "This is a remote invoke FAILURE on $computerName"
    }
    else{
        $computerName = [System.Net.Dns]::GetHostName()
        $firstMillionPrimes = Sum-FirstMillionPrimes
        Write-Output "$computerName - $firstMillionPrimes"
    }
} -AsJob | Out-Null

# In addition to the jobs started on multiple sessions, this also needs
# to be robust enough to handle other jobs running locally.
Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Out-Null

# It also needs to handle jobs created by Start-ThreadJob
Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Out-Null

# It also needs to handle jobs that have a state of Failed
Start-ThreadJob -ScriptBlock { throw "My job State will be Failed" } | Out-Null

# It should handle nested jobs that are successful
Start-Job -ScriptBlock { Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null
Start-Job -ScriptBlock { Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null
Start-ThreadJob -ScriptBlock { Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null

# It should handle nested jobs that are failures
Start-Job -ScriptBlock { Start-ThreadJob -ScriptBlock { throw "Handles nested thread jobs that fail" } | Receive-Job -Wait} | Out-Null
Start-Job -ScriptBlock { Start-Job -ScriptBlock { throw "Handles nested jobs that fail" } | Receive-Job -Wait} | Out-Null
Start-ThreadJob -ScriptBlock { Start-ThreadJob -ScriptBlock { throw "Handles nested thread jobs in thread jobs that fail" } | Receive-Job -Wait} | Out-Null
Run Code Online (Sandbox Code Playgroud)

预期输出(模拟),这将在处理完成时返回给终端。在异常的情况下,它几乎是瞬时的,但在长时间的计算中,结果可能会在它们完成时穿插:

This is a remote invoke FAILURE on MachineA
    + CategoryInfo          : OperationStopped: (This is a remote invoke FAILURE on MachineA:String) [], RuntimeException
    + FullyQualifiedErrorId : This is a remote invoke FAILURE on MachineA
    + PSComputerName        : MachineA
My job State will be Failed
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested thread jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested thread jobs in thread jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Localhost - (FirstMillionPrimes)
MachineC - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineC - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineB - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineB - (FirstMillionPrimes)
MachineA - (FirstMillionPrimes)
Run Code Online (Sandbox Code Playgroud)

我们提出的这个解决方案似乎有效,但它似乎非常笨拙。PowerShell 中是否有更好的方法/模式可以在结果完成时产生结果?

Dan*_*iel 2

听起来 PSRemotingJob.StateChanged 事件可能适合您。像这样的事情:

$global:results = [System.Collections.ArrayList]::new()

# create action scriptblock for eventhandling
$onJobFinish = {
    # only run action if job has terminated
    if ($Event.Sender.State -in @('Completed', 'Failed', 'Stopped', 'Suspended', 'Disconnected')) {
        $localResults = $Event.Sender | Receive-Job

        # immediately send output to screen
        $localResults | Out-Host

        # also add output to collection to work with later
        $global:results.Add($localResults) | Out-Null
    }
}

Invoke-Command -Session $sessions -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    $firstMillionPrimes = Sum-FirstMillionPrimes
    Write-Output "$computerName - $firstMillionPrimes"
} -AsJob | 
    Select-Object -ExpandProperty ChildJobs | ForEach-Object {
    # Register our action to run wheneven a child job's state changes
        Register-ObjectEvent -InputObject $_ -EventName 'StateChanged' -Action $onJobFinish
    }

Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Select-Object -ExpandProperty ChildJobs | ForEach-Object {
    # Register our action to run wheneven a child job's state changes
    Register-ObjectEvent -InputObject $_ -EventName 'StateChanged' -Action $onJobFinish
}

# access all results that have been received thus far
$global:results | Format-Table
Run Code Online (Sandbox Code Playgroud)

更新

您还可以执行类似的操作,只需将所有作业添加到单个集合中,并在它们运行/有数据时执行循环。您可以通过这种方式输出可用的数据,而不必等待作业完成。

$jobs = @()
$jobs += Invoke-Command -ScriptBlock $sb -ComputerName $computers -AsJob
$jobs += Start-Job -ScriptBlock $sb2
$jobs += Start-ThreadJob -ScriptBlock $sb3

$results = [System.Collections.ArrayList]::new()

while ($jobs | Where-Object { 
        $_.State -notin @('Completed', 'Failed', 'Stopped', 'Suspended', 'Disconnected') 
    }) {
    $localData = $jobs | Receive-Job
    $localData | Format-Table
    $results.Add($localData) | Out-Null

    Start-Sleep -Seconds 1
}
# Add one more collection of data for good measure
$localData = $jobs | Receive-Job
$localData | Format-Table
$results.Add($localData) | Out-Null
Run Code Online (Sandbox Code Playgroud)