kva*_*nck 62 networking haskell tms stm conduit
在我的GHC Haskell应用程序中使用stm,network-conduit和conduit,我为每个socket自动分叉使用了一个strand runTCPServer.Strands可以通过使用广播TChan与其他线路进行通信.
这展示了我想如何建立管道"链":

所以,我们这里有两个源(每个都绑定到辅助管道),它们产生一个接受并转入的Packet对象,然后发出套接字.对于两个输入的有效(性能是一个问题),我遇到了很大的困难.encoderByteString
如果有人能指出我正确的方向,我将不胜感激.
既然我没有尝试就发布这个问题是不礼貌的,我会把我以前在这里尝试过的东西放进去;
我已经编写/编写了一个函数,它(阻塞)从TMChan(可关闭的通道)生成一个源;
-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)
Run Code Online (Sandbox Code Playgroud)
同样,将Chan转换为接收器的功能;
-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink
Run Code Online (Sandbox Code Playgroud)
那么mergeSources很简单; fork 2个线程(我真的不想这么做,但是到底是什么)可以将他们的新项目放入一个列表中,然后我生成一个源代码;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan
Run Code Online (Sandbox Code Playgroud)
虽然我成功地完成了这些功能的类型检查,但是我没有成功地将这些功能用于类型检查.
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
Run Code Online (Sandbox Code Playgroud)
我认为这种方法无论如何都有缺陷 - 有许多中间列表和转换.这对性能不利.寻求指导.
PS.据我所知,这不是重复的; 将导管与多个输入融合,因为在我的情况下,两个源都生成相同的类型,并且我不关心Packet对象的生成源,只要我不等待其中一个而另一个已经准备好消耗对象.
PPS.我为示例代码中的Lens的使用(以及因此需要知识)道歉.
我不知道这是否有任何帮助,但我尝试实施伊恩的建议,并mergeSources'在任何通道停止时立即停止:
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c
Run Code Online (Sandbox Code Playgroud)
(这个简单的添加可以在这里找到)。
对您的版本的一些评论mergeSources(持保留态度,可能是我不太理解某些内容):
...TMChan代替...TBMChan似乎很危险。如果编写者比读者快,你的堆就会崩溃。从你的图表来看,如果你的 TCP 对等方读取数据的速度不够快,这似乎很容易发生。所以我肯定会使用...TBMChan可能很大但有限的界限。你不需要MonadSTM m约束。所有 STM 内容都包含IO在
liftSTM = liftIO . atomically
Run Code Online (Sandbox Code Playgroud)
也许这会对您在使用时略有mergeSources'帮助serverApp。
我发现这只是一个外观问题
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
Run Code Online (Sandbox Code Playgroud)
liftA2由于它在monad 上的使用,很难阅读(->) r。我会说
do
    c <- liftSTM newTMChan
    fsrc sx c
    retn c
Run Code Online (Sandbox Code Playgroud)
会更长,但更容易阅读。
您是否可以创建一个可以使用的独立项目serverApp?