我正在向此Haskell服务器发送简单的UDP数据包.对于数据包源,我使用由"aspell -l en dump master"生成的纯文本文件.但是,任何超过120,000条消息的列表都应该有效.如果我同时启动消费者和生产者,我不会丢失数据包.但是,我希望能够模拟一个非常忙碌的消费者.如果我在启动消费者之前引入threadDelay 20秒,我会丢失数据包.这对我来说是反直觉的,因为当我延迟消耗时,我在标准输出和磁盘IO方面做得更少.谁能解释为什么我会因延迟版本而受损?我如何管理套接字和TChan更好地工作,以便在我的消费者非常忙的时候不会有任何损失(只是更高的内存使用率)?
import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO
main :: IO ()
main = withSocketsDo $ do
hSetBuffering stdout NoBuffering
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing (Just "2000")
let serveraddr = head addrinfos
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
bindSocket sock (addrAddress …Run Code Online (Sandbox Code Playgroud) 我在代码中看到了一个常见的模式.我已经从数据库中排序结果,我需要在嵌套结构中发出它们.我想要这个流,所以我想一次在内存中记录少量记录.使用TravesableLike.groupBy假设数据没有排序,所以它不必要地填充可变映射.我想保持这种真正的流媒体.scalaz-stream在这里有用吗?
val sql = """select grandparent_id, parent_id, child_id
from children
where grandparent_id = ?
order by grandparent_id, parent_id, child_id"""
def elementsR[P, R](invoker: Invoker[P, R], param: P): Process[Task, R] =
// Invoker.elements returns trait CloseableIterator[+T] extends Iterator[T] with Closeable
resource(Task.delay(invoker.elements(param)))(
src => Task.delay(src.close)) { src =>
Task.delay { if (src.hasNext) src.next else throw End }
}
def dbWookie {
// grandparent_id, (grandparent_id, parent_id, child_id)
val invoker = Q.query[Int, (Int, Int, Int)](sql)
val es = elementsR(invoker, 42)
// ?, ?, ?
// …Run Code Online (Sandbox Code Playgroud)