使用带有超时的TChan

Mic*_*elO 7 multithreading haskell stm

我有一个TChan作为一个线程的输入,应该像这样:

如果sombody在特定时间内写入TChan,则应检索内容.如果在指定时间内没有任何内容写入,则应取消阻止并继续Nothing.

我对此的尝试是使用如下的超时功能System.Timeout:

timeout 1000000 $ atomically $ readTChan pktChannel
Run Code Online (Sandbox Code Playgroud)

这似乎工作,但现在我发现,我有时会丢失数据包(它们写入通道,但不会在另一侧读取.在日志中我得到这个:

2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439
Run Code Online (Sandbox Code Playgroud)

其中"推送记录的数据包"是从一个线程写入,"发送记录的数据包"是从发送方线程中的TChan读取.Sending Recorded Packet 2 1439缺少该行,这表示从TChan成功读取.

似乎如果在错误的时间点收到超时,则通道丢失该数据包.我怀疑threadKill内部使用的功能timeout和STM不能很好地协同工作.

它是否正确?有人有另一种不会丢失数据包的解决方案吗?

Nat*_*ell 6

使用registerDelaySTM功能来指示TVar何时达到超时.然后,您可以使用该orElse函数或Alternative运算符<|>在下一个TChan值或超时之间进行选择.

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
  let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
  forM_ xs $ \ x -> do
    threadDelay x
    atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
  delay <- registerDelay timeoutAfter
  atomically $
        Just <$> readTChan pktChannel
    <|> pure Nothing <* fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
  res <- readTChanTimeout timeoutAfter pktChannel
  case res of
    Nothing -> putStrLn "timeout"
    Just val -> do
      putStrLn $ "packet: " ++ show val
      readLoop timeoutAfter pktChannel

main :: IO ()
main = do
  let timeoutAfter = 1000000

  -- spin up a packet writer simulation
  pktChannel <- newTChanIO
  tid <- forkIO $ packetWriter timeoutAfter pktChannel

  readLoop timeoutAfter pktChannel

  killThread tid
Run Code Online (Sandbox Code Playgroud)

  • `pure Nothing &lt;* fini delay` 可以写成 `Nothing &lt;$ fini delay` 所以它看起来更像是 `Just` 情况 (2认同)