asm*_*asm 4 haskell semaphore shelly
我正在编写一个简单的脚本来使用Shelly库并行运行大量任务,但我想限制任何时候运行的最大任务数.该脚本在每行上获取一个带有输入的文件,并为该输入运行任务.文件中有几百个输入,我想一次限制到大约16个进程.
当前脚本实际上使用初始计数为1的QSem限制为1(很好地尝试).我似乎错过了一些东西,因为当我运行带有4个输入的测试文件时,我看到:
Starting Starting Starting Starting Done Done Done Done
所以线程没有像我预期的那样在QSem上阻塞,它们都是同时运行的.我甚至已经实现了我自己的信号量MVar,TVar并且都没有像我预期的那样工作.我显然缺少一些基本的但是什么?我也尝试编译代码并将其作为二进制文件运行.
#!/usr/bin/env runhaskell
{-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-}
import Shelly
import Prelude hiding (FilePath)
import Text.Shakespeare.Text (lt)
import qualified Data.Text.Lazy as LT
import Control.Monad (forM)
import System.Environment (getArgs)
import qualified Control.Concurrent.QSem as QSem
import Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar)
-- Define max number of simultaneous processes
maxProcesses :: IO QSem.QSem
maxProcesses = QSem.newQSem 1
bkGrnd :: ShIO a -> ShIO (MVar a)
bkGrnd proc = do
mvar <- liftIO newEmptyMVar
_ <- liftIO $ forkIO $ do
-- Block until there are free processes
sem <- maxProcesses
QSem.waitQSem sem
putStrLn "Starting"
-- Run the shell command
result <- shelly $ silently proc
liftIO $ putMVar mvar result
putStrLn "Done"
-- Signal that this process is done and another can run.
QSem.signalQSem sem
return mvar
main :: IO ()
main = shelly $ silently $ do
[img, file] <- liftIO $ getArgs
contents <- readfile $ fromText $ LT.pack file
-- Run a backgrounded process for each line of input.
results <- forM (LT.lines contents) $ \line -> bkGrnd $ do
runStdin <command> <arguments>
liftIO $ mapM_ takeMVar results
正如我在评论中所说,每次调用bkGrnd都会创建自己的semaphonre,允许每个线程继续而无需等待.我会尝试这样的事情,其中信号量是在main每次创建的时候传递的bkGrnd.
bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
mvar <- liftIO newEmptyMVar
_ <- liftIO $ forkIO $ do
-- Block until there are free processes
QSem.waitQSem sem
--
-- code continues as before
--
main :: IO ()
main = shelly $ silently $ do
[img, file] <- liftIO $ getArgs
contents <- readfile $ fromText $ LT.pack file
sem <- maxProcesses
-- Run a backgrounded process for each line of input.
results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
runStdin <command> <arguments>
liftIO $ mapM_ takeMVar results
Run Code Online (Sandbox Code Playgroud)