"Monad-friendly"基于事件的IO

dfl*_*str 11 sockets concurrency events haskell

我想使用" epoll"式事件管理实现高效的单线程套接字通信.

如果我"从头开始"编写一个非常强制性的程序,我会基本上这样做(只是一些我刚输入的伪代码 - 可能不会编译):

import Control.Concurrent

import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString

import qualified GHC.Event as Event

import Network
import Network.Socket
import Network.Socket.ByteString

main = withSocketFromSomewhere $ \ socket -> do
  let fd = fromIntegral . fdSocket $ socket

  -- Some app logic
  state <- newMVar "Bla"

  -- Event manager
  manager <- Event.new

  -- Do an initial write
  initialWrite socket state manager

  -- Manager does its thing
  Event.loop manager

write manager socket bs =
  -- Should be pretty straight-forward
  Event.registerFd manager theWrite fd Event.evtWrite
  where
    fd = fromIntegral . fdSocket $ socket
    theWrite key _ = do
      Event.unregisterFd manager key
      sendAll socket bs

read manager socket cont =
  -- Ditto
  Event.registerFd manager theRead fd Event.evtRead
  where
    fd = fromIntegral . fdSocket $ socket
    theRead key _ = do
      Event.unregisterFd manager key
      bs <- recv socket 4096
      cont bs

initialWrite socket state manager = do
  msg <- readMVar state
  write manager socket msg
  read manager socket $ \ bs -> do
    ByteString.putStrLn bs
    putMVar state msg
Run Code Online (Sandbox Code Playgroud)

想象一下,还有一些函数可以向管理器添加超时事件,等等.

但是,这个代码不是特别好,原因如下:

  1. 我手动携带事件管理器.
  2. 我必须使用MVarfor我的应用程序逻辑,因为我无法告诉opaque事件管理器它应该为我传递一些状态,即使我知道它只使用一个线程,因此可能被用作基础的monad变压器堆栈.
  3. 我必须为读取创建明确的分隔连续(我甚至可能必须为写入执行此操作;我不知道在这种情况下会更明智).

现在,这只是尖叫着使用过多的monad变换器等等.我希望能够这样做:

main =
  withSocketFromSomewhere $ \ socket ->
  runEvents . flip runStateT "Bla" $ initialWrite socket

initialWrite socket = do
  msg <- lift get
  write socket msg
  resp <- read socket
  liftIO $ ByteString.putStrLn resp
  lift $ put msg
Run Code Online (Sandbox Code Playgroud)

此代码应具有与上述代码相同的行为; 例如,通过暂停计算直到resp <- read socket在线路上接收到读取,并让我在同一线程/管理器上管理多个套接字.

问题:

  1. 是否有更高级别的GHC事件API/libevent /等效接口,为用户提供更多功能?考虑到最近的GHC中发生的同步IO调度变化(我在7.4.1),它是否值得呢?
  2. 如果我想实现协同并发,例如,有一个函数总是处理来自套接字的读取,但是这个函数与写"线程"共享相同的状态monad,该怎么办?这可以通过(1)的任何解决方案来完成吗?

Gab*_*lez 19

我强烈建议您阅读基于语言的方法来统一事件和线程.它讨论了如何在您选择的IO子系统之上构建您想要的任何并发系统,并且在他们的论文中他们实际上是在它上面实现它epoll.

不幸的是,论文中的数据类型和代码示例非常差,并且花了一些时间(至少对我来说)对其代码进行逆向工程,甚至在他们的论文中也存在一些错误.然而,他们的方法实际上是一个非常强大的通用方法的一个子集,称为"免费monad".

例如,他们的Trace数据类型只是伪装的免费monad.为了了解原因,让我们参考Haskell对自由monad的定义:

data Free f r = Pure r | Free (f (Free f r))
Run Code Online (Sandbox Code Playgroud)

一个免费的monad就像一个"functor列表",Pure类似于list的Nil构造函数,Free类似于list的 Cons构造函数,因为它在"list"上添加了一个额外的functor.从技术上讲,如果我是迂腐的,没有任何东西可以说免费的monad必须被实现为上面列出的数据类型,但是你实现的任何东西都必须与上面的数据类型同构.

关于免费monad的好处是,给定一个仿函数f,Free f它自动成为monad:

instance (Functor f) => Monad (Free f) where
    return = Pure
    Pure r >>= f = f r
    Free x >>= f = Free (fmap (>>= f) x)
Run Code Online (Sandbox Code Playgroud)

这意味着我们可以将他们的Trace数据类型分解为两个部分,基本仿函数f,然后是由f以下公式生成的免费monad :

-- The base functor
data TraceF x =
    SYS_NBIO (IO x)
  | SYS_FORK x x
  | SYS_YIELD x
  | SYS_RET
  | SYS_EPOLL_WAIT FD EPOLL_EVENT x

-- You can even skip this definition if you use the GHC
-- "DerivingFunctor" extension
instance Functor TraceF where
    fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x)
    fmap f (SYS_FORK x) = SYS_FORK (f x) (f x)
    fmap f (SYS_YIELD x) = SYS_YIELD (f x)
    fmap f SYS_RET = SYS_RET
    fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x)
Run Code Online (Sandbox Code Playgroud)

鉴于这个仿函数,你可以Trace"免费" 获得monad:

type Trace a = Free TraceF a
-- or: type Trace = Free TraceF
Run Code Online (Sandbox Code Playgroud)

...虽然这不是为什么它被称为"自由"monad.

然后更容易定义它们的所有功能:

liftF = Free . fmap Pure
-- if "Free f" is like a list of "f", then
-- this is sort of like: "liftF x = [x]"
-- it's just a convenience function

-- their definitions are written in continuation-passing style,
-- presumably for efficiency, but they are equivalent to these
sys_nbio io = liftF (SYS_NBIO io)
sys_fork t = SYS_FORK t (return ()) -- intentionally didn't use liftF
sys_yield = liftF (SYS_YIELD ())
sys_ret = liftF SYS_RET
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event ())
Run Code Online (Sandbox Code Playgroud)

那么你可以像monad一样使用这些命令:

myTrace fd event = do
    sys_nbio (putStrLn "Hello, world")
    fork $ do
        sys_nbio (putStrLn "Hey")
    sys_expoll_wait fd event
Run Code Online (Sandbox Code Playgroud)

现在,这是关键概念.我刚写的那个monad只创建了一个数据类型.而已.它根本不解释它.就像你为表达式编写抽象语法树一样.这完全取决于您如何评估它.在论文中,他们给出了表达式的解释器的具体示例,但编写自己的解释器是微不足道的.

重要的概念是这个解释器可以运行在你想要的任何monad中.因此,如果您想通过并发处理某些状态,则可以执行此操作.例如,这是一个玩具解释器,它使用StateT IO monad来跟踪IO动作被调用的次数:

interpret t = case t of
    SYS_NBIO io -> do
        modify (+1)
        t' <- lift io
        interpret t'
    ...
Run Code Online (Sandbox Code Playgroud)

您甚至可以跨forkIO的操作线程monad!这是我的一些非常古老的代码,它是有缺陷和跛脚的,因为它是在我经验不足并且不知道什么是免费的monad时被写回来的,但它在行动中证明了这一点:

module Thread (Thread(..), done, lift, branch, fork, run) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Cont
import Data.Sequence
import qualified Data.Foldable as F

data Thread f m =
    Done
  | Lift (m (Thread f m))
  | LiftIO (IO (Thread f m))
  | Branch (f (Thread f m))
  | Exit

done = cont $ \c -> Done
lift' x = cont $ \c -> Lift $ liftM c x
liftIO' x = cont $ \c -> LiftIO $ liftM c x
branch x = cont $ \c -> Branch $ fmap c x
exit = cont $ \c -> Exit

fork x = join $ branch [return (), x >> done]

run x = do
    q <- liftIO $ newTChanIO
    enqueue q $ runCont x $ \_ -> Done
    loop q
  where
    loop q = do
        t <- liftIO $ atomically $ readTChan q
        case t of
            Exit -> return ()
            Done -> loop q
            Branch ft -> mapM_ (enqueue q) ft >> loop q
            Lift mt -> (mt >>= enqueue q) >> loop q
            LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q
    enqueue q = liftIO . atomically . writeTChan q
Run Code Online (Sandbox Code Playgroud)

免费monad背后的一点是,他们提供monad实例和NOTHING ELSE.换句话说,他们退后一步,让你完全自由地想要解释它们,这就是为什么它们非常有用.