Imp*_*ive 8 sockets haskell conduit network-conduit
下面是一些使用实现了小型接收服务器代码conduit,network-conduit和stm-conduit.它在套接字上接收数据,然后通过STM通道将其流式传输到主线程.
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)
import System.Directory (removeFile)
import System.IO
type BSChan = TBMChan ByteString
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ listen soc 2 >> loop where
loop = do
(conn, _) <- accept soc
sourceSocket conn $$ sinkTBMChan chan
close conn
loop
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
socChan <- listenSocket soc 8
sourceTBMChan socChan $$ DCB.sinkHandle stdout
removeFile "mysock"
Run Code Online (Sandbox Code Playgroud)
(在实际应用程序中,来自套接字的数据流与其他一些数据流合并,这就是我不直接在侦听器中处理它的原因).
问题是,在我预期它将保持打开直到主线程被杀死的地方,而不是在套接字上收到第一条消息后退出.我无法弄清楚为什么会这样做,除非是看到第一个数据流结束后接收器(在第2行到最后一行)退出.我可以说服它不要这样做吗?有一些Conduit关于使源可恢复的东西,但不是一个接收器.
来自以下文件sinkTBMChan:
当水槽关闭时,通道也会关闭.
因此,当第一插座手柄关闭时,它使Source从sourceSocket关闭,在关闭连接的信宿这又关闭TBMChan传播到sinkHandle停止水槽.
解决此问题的最简单方法可能是将您更改loop为不在连接之间关闭的自定义源,并将该源连接到TBMChan.
listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
chan <- atomically $ newTBMChan bufSize
forkListener chan
return chan
where
forkListener chan = void . forkIO $ do
listen soc 2
loop $$ sinkTBMChan chan
loop = do
(conn, _) <- liftIO $ accept soc
sourceSocket conn
liftIO $ close conn
loop
Run Code Online (Sandbox Code Playgroud)
协调从通道中关闭作家和读者的问题并非易事,但是您可以重用pipes生态系统中的解决方案来解决此问题,即使用pipes-concurrency库。该库提供了几个pipes独立的实用程序,您可以将它们与conduit库一起重用,以在读取器和写入器之间进行通信,从而使每一端都可以自动正确地知道何时清理,也可以手动清理任一端。
您从pipes-concurrency库中使用的关键功能是spawn。它的类型是:
spawn :: Buffer a -> IO (Output a, Input a)
Run Code Online (Sandbox Code Playgroud)
在Buffer什么潜在的STM通道抽象指定使用。从示例代码来看,这听起来像您想要一个Bounded缓冲区:
spawn (Bounded 8) :: IO (Output a, Input a)
Run Code Online (Sandbox Code Playgroud)
本a可以在这种情况下任何东西,所以它可以是一个ByteString,例如:
spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
Run Code Online (Sandbox Code Playgroud)
在Input和Output像一个邮箱的行为。您的邮箱通过邮件添加send荷兰国际集团的数据到OutputS按你走的消息从邮箱中(FIFO顺序),recv荷兰国际集团的数据InputS:
-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool
-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)
Run Code Online (Sandbox Code Playgroud)
的pipes-concurrency一个很好的功能是,如果邮箱没有读取器或没有写入器,它会检测垃圾收集器以自动密封邮箱。这避免了常见的死锁源。
如果使用的是pipes生态系统,通常将使用以下两个更高级别的实用程序来读写邮箱。
-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()
-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()
Run Code Online (Sandbox Code Playgroud)
但是,由于核心机制是pipes独立的,因此您可以重写conduit这些功能的等效版本:
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent
toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)
fromInput' :: Input a -> Source IO a
fromInput' i = do
ma <- lift $ atomically $ recv i
case ma of
Nothing -> return ()
Just a -> do
yield a
fromInput' i
Run Code Online (Sandbox Code Playgroud)
然后,您的主要功能将如下所示:
main :: IO ()
main = do
soc <- socket AF_UNIX Stream 0
bind soc (SockAddrUnix "mysock")
(output, input) <- spawn (Bounded 8)
forkIO $ readFromSocket soc $$ toOutput output
fromInput input $$ DCB.sinkHandle stdout
removeFile "mysock"
Run Code Online (Sandbox Code Playgroud)
...这里readFromSocket是一些Source从您的阅读Socket。
然后,您可以自由地写入output数据的使用其他来源也一样,而不用担心有协调他们或的处理input或output当你处理得当。
要了解更多信息pipes-concurrency,建议阅读官方教程。