Haskell中并发通道的严格评估技术

chr*_*gue 4 concurrency haskell strictness

我正在玩Haskell线程,我遇到了在通道中传递延迟评估值的问题.例如,使用N个工作线程和1个输出线程,工作人员传达未评估的工作,输出线程最终为他们完成工作.

我已经在各种文档中看到了这个问题并看到了各种解决方案,但我发现只有一个解决方案可行,其余解决方案则没有.下面是一些代码,其中工作线程开始一些可能需要很长时间的计算.我按降序启动线程,这样第一个线程应该占用最长的,后面的线程应该更早完成.

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan   -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
  logV <- spawn1 readWriteLoop
  say "START"
  forM_ [18,17..10] $ spawn . busyWork
  await
  writeChan logCh Nothing -- poison the logger
  takeMVar logV
  putStrLn "DONE"
  where
    say mesg = force mesg >>= writeChan logCh . Just

    force s = mapM evaluate s  -- works
--    force s = return $ s `using` rdeepseq  -- no difference
--    force s = return s -- no-op; try this with strict channel

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
    embiggen i = i*i*i*i*i

    readWriteLoop = readChan logCh >>= writeReadLoop
    writeReadLoop Nothing = return ()
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

    spawn1 action = do
      v <- newEmptyMVar
      forkIO $ action `finally` putMVar v ()
      return v

    spawn action = do
      v <- spawn1 action
      modifyMVar statVars $ \vs -> return (v:vs, ())

    await = do
      vs <- modifyMVar statVars $ \vs -> return ([], vs)
      mapM_ takeMVar vs
Run Code Online (Sandbox Code Playgroud)

使用大多数技术,结果按产生的顺序报告; 也就是说,首先是运行时间最长的计算.我解释这意味着输出线程正在做所有的工作:

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE
Run Code Online (Sandbox Code Playgroud)

我认为这个问题的答案是严格的渠道,但它们没有用.我知道字符串的WHNF是不够的,因为它只会强制最外层的构造函数(字符串的第一个字符为nil或cons).本rdeepseq应该全面评估,但它并没有区别.我发现唯一有效的方法是映射Control.Exception.evaluate :: a -> IO a字符串中的所有字符.(有关force几种不同的选择,请参阅代码中的函数注释.)以下是结果Control.Exception.evaluate:

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE
Run Code Online (Sandbox Code Playgroud)

那么为什么没有严格的渠道或rdeepseq产生这样的结果呢?还有其他技术吗?我误解了为什么第一个结果被打破了吗?

Edw*_*ang 5

这里有两个问题.

第一次尝试(使用显式rnf)不起作用的原因是,通过使用return,您创建了一个在评估时完全评估自身的thunk,但thunk本身尚未被评估.请注意,评估的类型是a -> IO a:它以可以强加排序的IO方式返回值evaluate:

return (error "foo")   >> return 1 == return 1
evaluate (error "foo") >> return 1 == error "foo"
Run Code Online (Sandbox Code Playgroud)

结果就是这段代码:

force s = evaluate $ s `using` rdeepseq
Run Code Online (Sandbox Code Playgroud)

将起作用(如同,具有相同的行为mapM_ evaluate s).


使用严格通道的情况有点棘手,但我认为这是由于严格并发中的错误.实际上,昂贵的计算是在工作线程上运行的,但是它并没有给你带来太多好处(你可以通过在字符串中隐藏一些异步异常并查看异常表面所在的线程来明确检查).

什么是虫子?我们来看看严格的代码writeChan:

writeChan :: NFData a => Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
  new_hole <- newEmptyMVar
  modifyMVar_ write $ \old_hole -> do
    putMVar old_hole $! ChItem val new_hole
    return new_hole
Run Code Online (Sandbox Code Playgroud)

我们看到在评估thunk之前modifyMVar_已经调用write了它.那么操作的顺序是:

  1. writeChan 进入
  2. 我们takeMVar write(阻止任何想要写入频道的人)
  3. 我们评估昂贵的thunk
  4. 我们把昂贵的thunk放到了频道上
  5. 我们putMVar write,解锁所有其他线程

您没有看到evaluate变体的这种行为,因为它们在获取锁之前执行评估.

我会向Don发送关于此的邮件,看看他是否同意这种行为有点不理想.

唐同意这种行为不是最理想的.我们正在开发一个补丁.