如何将原子计数器添加到 powershell ForEach -Parallel 循环

Ryu*_* S. 5 parallel-processing powershell multithreading

在这个问题中,解释了如何添加到并发ThreadSafe集合Powershell:如何将结果添加到数组(ForEach-Object -Parallel)

我有一个更简单的用例,我只想增加一个值。(整数)。

是否可以在 Powershell 中使用某种原子整数数据类型来完成?

$myAtomicCounter = 0

$myItems | ForEach-Object -Parallel {
    #...other work

    $myAtomicCounter.ThreadSafeAdd(2)

    # .. some more work using counter
}

Write-Host($myAtomicCounter)
Run Code Online (Sandbox Code Playgroud)

San*_*zon 7

例如,在 PowerShell 中,当从多个线程更新单个值时,您必须使用锁定机制,Mutex否则SemaphoreSlim更新Monitor.Enter操作将不是线程安全的同步哈希表不能确保更新键值是线程安全的

下面是一个简单的演示,证明了上述内容:

$sync = [hashtable]::Synchronized(@{ })
$attempts = 0

do {
    $sync['Value'] = 0
    $attempts++
    0..10 | ForEach-Object -Parallel {
        $sync = $using:sync
        Start-Sleep -Milliseconds 200
        $sync['Value']++
    } -ThrottleLimit 11
}
while ($sync['Value'] -eq 11)

"It took $attempts attempts to fail..."
Run Code Online (Sandbox Code Playgroud)

假设我们有一个数组的数组:

$toProcess = 0..10 | ForEach-Object {
    , (Get-Random -Count (Get-Random -Minimum 5 -Maximum 10))
}
Run Code Online (Sandbox Code Playgroud)

如果您想跟踪每个数组中已处理的项目,可以使用以下方法来实现Mutex

$processedItems = [hashtable]::Synchronized(@{
    Lock    = [System.Threading.Mutex]::new()
    Counter = 0
})

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # bring the local variable to this scope
    $ref = $using:processedItems
    # lock this thread until I can write
    if($ref['Lock'].WaitOne()) {
        # when I can write, update the value
        $ref['Counter'] += $_.Count
        # and realease this lock so others threads can write
        $ref['Lock'].ReleaseMutex()
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$processedItems['Counter'] -eq $processedCount
Run Code Online (Sandbox Code Playgroud)

Monitor.Enter使用尝试类似于 C#lock语句的自定义函数来安全递增计数器的另一个示例:

function lock {
    param(
        [Parameter(Mandatory)]
        [object] $Object,

        [Parameter(Mandatory)]
        [scriptblock] $ScriptBlock
    )

    try {
        [System.Threading.Monitor]::Enter($Object)
        & $ScriptBlock
    }
    finally {
        [System.Threading.Monitor]::Exit($Object)
    }
}

$utils = [hashtable]::Synchronized(@{
    LockFunc = $function:lock.ToString()
    Counter  = @(0)
})

$toProcess | ForEach-Object -Parallel {
    # bring the utils var to this scope
    $utils = $using:utils
    # define the `lock` function here
    $function:lock = $utils['LockFunc']

    Start-Sleep (Get-Random -Maximum 5)

    # lock the counter array
    lock($utils['Counter'].SyncRoot) {
        # increment and release when done
        $utils['Counter'][0] += $_.Count
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$utils['Counter'][0] -eq $processedCount
Run Code Online (Sandbox Code Playgroud)

PowerShell 中一种更简单的方法是将并行循环输出到线性循环,您可以在其中安全地更新计数器,而不必关心线程安全:

$counter = 0

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # when this thread is done,
    # output this array of processed items
    $_
    
} | ForEach-Object {
    # then the output from the parallel loop is received in this linear
    # thread safe loop where we can update the counter
    $counter += $_.Count
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$counter -eq $processedCount
Run Code Online (Sandbox Code Playgroud)

  • mmm 似乎在 powershell 中表现不佳 @zett42 我没有得到一致的结果 `$i = [ref] 0; 0..100 | 0..100 ForEach-Object -Parallel { $i = $using:i; 开始-睡眠(获取随机-最大 4);$null = [System.Threading.Interlocked]::Increment($i) }` 有时按预期为 101,其他则小于该值 (2认同)