Mic*_*elO 7 multithreading haskell stm
我有一个TChan作为一个线程的输入,应该像这样:
如果sombody在特定时间内写入TChan,则应检索内容.如果在指定时间内没有任何内容写入,则应取消阻止并继续Nothing.
我对此的尝试是使用如下的超时功能System.Timeout:
timeout 1000000 $ atomically $ readTChan pktChannel
Run Code Online (Sandbox Code Playgroud)
这似乎工作,但现在我发现,我有时会丢失数据包(它们写入通道,但不会在另一侧读取.在日志中我得到这个:
2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439
Run Code Online (Sandbox Code Playgroud)
其中"推送记录的数据包"是从一个线程写入,"发送记录的数据包"是从发送方线程中的TChan读取.Sending Recorded Packet 2 1439缺少该行,这表示从TChan成功读取.
似乎如果在错误的时间点收到超时,则通道丢失该数据包.我怀疑threadKill内部使用的功能timeout和STM不能很好地协同工作.
它是否正确?有人有另一种不会丢失数据包的解决方案吗?
使用registerDelaySTM功能来指示TVar何时达到超时.然后,您可以使用该orElse函数或Alternative运算符<|>在下一个TChan值或超时之间进行选择.
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random
-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
forM_ xs $ \ x -> do
threadDelay x
atomically $ writeTChan chan x
-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar
-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
delay <- registerDelay timeoutAfter
atomically $
Just <$> readTChan pktChannel
<|> pure Nothing <* fini delay
-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
res <- readTChanTimeout timeoutAfter pktChannel
case res of
Nothing -> putStrLn "timeout"
Just val -> do
putStrLn $ "packet: " ++ show val
readLoop timeoutAfter pktChannel
main :: IO ()
main = do
let timeoutAfter = 1000000
-- spin up a packet writer simulation
pktChannel <- newTChanIO
tid <- forkIO $ packetWriter timeoutAfter pktChannel
readLoop timeoutAfter pktChannel
killThread tid
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
512 次 |
| 最近记录: |