导管和插座:允许多个连接

Imp*_*ive 8 sockets haskell conduit network-conduit

下面是一些使用实现了小型接收服务器代码conduit,network-conduitstm-conduit.它在套接字上接收数据,然后通过STM通道将其流式传输到主线程.

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class

import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)

import System.Directory (removeFile)
import System.IO

type BSChan = TBMChan ByteString

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
      loop = do
        (conn, _) <- accept soc
        sourceSocket conn $$ sinkTBMChan chan
        close conn
        loop

main :: IO ()
main = do
  soc <- socket AF_UNIX Stream 0
  bind soc (SockAddrUnix "mysock")
  socChan <- listenSocket soc 8
  sourceTBMChan socChan $$ DCB.sinkHandle stdout
  removeFile "mysock"
Run Code Online (Sandbox Code Playgroud)

(在实际应用程序中,来自套接字的数据流与其他一些数据流合并,这就是我不直接在侦听器中处理它的原因).

问题是,在我预期它将保持打开直到主线程被杀死的地方,而不是在套接字上收到第一条消息后退出.我无法弄清楚为什么会这样做,除非是看到第一个数据流结束后接收器(在第2行到最后一行)退出.我可以说服它不要这样做吗?有一些Conduit关于使源可恢复的东西,但不是一个接收器.

sha*_*ang 7

来自以下文件sinkTBMChan:

当水槽关闭时,通道也会关闭.

因此,当第一插座手柄关闭时,它使SourcesourceSocket关闭,在关闭连接的信宿这又关闭TBMChan传播到sinkHandle停止水槽.

解决此问题的最简单方法可能是将您更改loop为不在连接之间关闭的自定义源,并将该源连接到TBMChan.

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ do
      listen soc 2
      loop $$ sinkTBMChan chan

    loop = do
      (conn, _) <- liftIO $ accept soc
      sourceSocket conn
      liftIO $ close conn
      loop
Run Code Online (Sandbox Code Playgroud)


Gab*_*lez 5

协调从通道中关闭作家和读者的问题并非易事,但是您可以重用pipes生态系统中的解决方案来解决此问题,即使用pipes-concurrency库。该库提供了几个pipes独立的实用程序,您可以将它们与conduit库一起重用,以在读取器和写入器之间进行通信,从而使每一端都可以自动正确地知道何时清理,也可以手动清理任一端。

您从pipes-concurrency库中使用的关键功能是spawn。它的类型是:

spawn :: Buffer a -> IO (Output a, Input a)
Run Code Online (Sandbox Code Playgroud)

Buffer什么潜在的STM通道抽象指定使用。从示例代码来看,这听起来像您想要一个Bounded缓冲区:

spawn (Bounded 8) :: IO (Output a, Input a)
Run Code Online (Sandbox Code Playgroud)

a可以在这种情况下任何东西,所以它可以是一个ByteString,例如:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)
Run Code Online (Sandbox Code Playgroud)

InputOutput像一个邮箱的行为。您的邮箱通过邮件添加send荷兰国际集团的数据到OutputS按你走的消息从邮箱中(FIFO顺序),recv荷兰国际集团的数据InputS:

-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool

-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)
Run Code Online (Sandbox Code Playgroud)

pipes-concurrency一个很好的功能是,如果邮箱没有读取器或没有写入器,它会检测垃圾收集器以自动密封邮箱。这避免了常见的死锁源。

如果使用的是pipes生态系统,通常将使用以下两个更高级别的实用程序来读写邮箱。

-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()

-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()
Run Code Online (Sandbox Code Playgroud)

但是,由于核心机制是pipes独立的,因此您可以重写conduit这些功能的等效版本:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent

toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)

fromInput' :: Input a -> Source IO a
fromInput' i = do
    ma <- lift $ atomically $ recv i
    case ma of
        Nothing -> return ()
        Just a  -> do
            yield a
            fromInput' i
Run Code Online (Sandbox Code Playgroud)

然后,您的主要功能将如下所示:

main :: IO ()
main = do
    soc <- socket AF_UNIX Stream 0
    bind soc (SockAddrUnix "mysock")
    (output, input) <- spawn (Bounded 8)
    forkIO $ readFromSocket soc $$ toOutput output
    fromInput input $$ DCB.sinkHandle stdout
  removeFile "mysock"
Run Code Online (Sandbox Code Playgroud)

...这里readFromSocket是一些Source从您的阅读Socket

然后,您可以自由地写入output数据的使用其他来源也一样,而不用担心有协调他们或的处理inputoutput当你处理得当。

要了解更多信息pipes-concurrency,建议阅读官方教程