我正在向此Haskell服务器发送简单的UDP数据包.对于数据包源,我使用由"aspell -l en dump master"生成的纯文本文件.但是,任何超过120,000条消息的列表都应该有效.如果我同时启动消费者和生产者,我不会丢失数据包.但是,我希望能够模拟一个非常忙碌的消费者.如果我在启动消费者之前引入threadDelay 20秒,我会丢失数据包.这对我来说是反直觉的,因为当我延迟消耗时,我在标准输出和磁盘IO方面做得更少.谁能解释为什么我会因延迟版本而受损?我如何管理套接字和TChan更好地工作,以便在我的消费者非常忙的时候不会有任何损失(只是更高的内存使用率)?
import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO
main :: IO ()
main = withSocketsDo $ do
hSetBuffering stdout NoBuffering
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing (Just "2000")
let serveraddr = head addrinfos
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
bindSocket sock (addrAddress serveraddr)
chan <- newTChanIO
forkIO(producer chan sock)
-- Uncomment the threadDelay below to see lossy version
-- threadDelay (1000000 * 20)
forkIO(consumer chan)
forever $ threadDelay (1000000 * 60)
producer :: TChan ByteString -> Socket -> IO ()
producer chan sock = forever $ do
(msg) <- recv sock 256
atomically $ writeTChan chan msg
consumer :: TChan ByteString -> IO ()
consumer chan = forever $ do
msg <- atomically $ readTChan chan
Char8.putStr msg
Run Code Online (Sandbox Code Playgroud)
为什么您不希望数据丢失?这种情况在任何语言中都会发生,而不仅仅是 Haskell,因为内核为每个套接字只有有限的缓冲区空间。由于与 TCP 不同,UDP 不受流量控制,因此内核只会丢弃数据包。当然,您可以使用 SO_RCVBUF 套接字选项来增加缓冲区大小。但从根本上来说,UDP 并不可靠,因此任何使用它的实际应用程序都需要处理数据包丢失问题。
归档时间: |
|
查看次数: |
1016 次 |
最近记录: |