dfl*_*str 11 sockets concurrency events haskell
我想使用" epoll"式事件管理实现高效的单线程套接字通信.
如果我"从头开始"编写一个非常强制性的程序,我会基本上这样做(只是一些我刚输入的伪代码 - 可能不会编译):
import Control.Concurrent
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import qualified GHC.Event as Event
import Network
import Network.Socket
import Network.Socket.ByteString
main = withSocketFromSomewhere $ \ socket -> do
let fd = fromIntegral . fdSocket $ socket
-- Some app logic
state <- newMVar "Bla"
-- Event manager
manager <- Event.new
-- Do an initial write
initialWrite socket state manager
-- Manager does its thing
Event.loop manager
write manager socket bs =
-- Should be pretty straight-forward
Event.registerFd manager theWrite fd Event.evtWrite
where
fd = fromIntegral . fdSocket $ socket
theWrite key _ = do
Event.unregisterFd manager key
sendAll socket bs
read manager socket cont =
-- Ditto
Event.registerFd manager theRead fd Event.evtRead
where
fd = fromIntegral . fdSocket $ socket
theRead key _ = do
Event.unregisterFd manager key
bs <- recv socket 4096
cont bs
initialWrite socket state manager = do
msg <- readMVar state
write manager socket msg
read manager socket $ \ bs -> do
ByteString.putStrLn bs
putMVar state msg
Run Code Online (Sandbox Code Playgroud)
想象一下,还有一些函数可以向管理器添加超时事件,等等.
但是,这个代码不是特别好,原因如下:
MVarfor我的应用程序逻辑,因为我无法告诉opaque事件管理器它应该为我传递一些状态,即使我知道它只使用一个线程,因此可能被用作基础的monad变压器堆栈.现在,这只是尖叫着使用过多的monad变换器等等.我希望能够这样做:
main =
withSocketFromSomewhere $ \ socket ->
runEvents . flip runStateT "Bla" $ initialWrite socket
initialWrite socket = do
msg <- lift get
write socket msg
resp <- read socket
liftIO $ ByteString.putStrLn resp
lift $ put msg
Run Code Online (Sandbox Code Playgroud)
此代码应具有与上述代码相同的行为; 例如,通过暂停计算直到resp <- read socket在线路上接收到读取,并让我在同一线程/管理器上管理多个套接字.
问题:
Gab*_*lez 19
我强烈建议您阅读基于语言的方法来统一事件和线程.它讨论了如何在您选择的IO子系统之上构建您想要的任何并发系统,并且在他们的论文中他们实际上是在它上面实现它epoll.
不幸的是,论文中的数据类型和代码示例非常差,并且花了一些时间(至少对我来说)对其代码进行逆向工程,甚至在他们的论文中也存在一些错误.然而,他们的方法实际上是一个非常强大的通用方法的一个子集,称为"免费monad".
例如,他们的Trace数据类型只是伪装的免费monad.为了了解原因,让我们参考Haskell对自由monad的定义:
data Free f r = Pure r | Free (f (Free f r))
Run Code Online (Sandbox Code Playgroud)
一个免费的monad就像一个"functor列表",Pure类似于list的Nil构造函数,Free类似于list的 Cons构造函数,因为它在"list"上添加了一个额外的functor.从技术上讲,如果我是迂腐的,没有任何东西可以说免费的monad必须被实现为上面列出的数据类型,但是你实现的任何东西都必须与上面的数据类型同构.
关于免费monad的好处是,给定一个仿函数f,Free f它自动成为monad:
instance (Functor f) => Monad (Free f) where
return = Pure
Pure r >>= f = f r
Free x >>= f = Free (fmap (>>= f) x)
Run Code Online (Sandbox Code Playgroud)
这意味着我们可以将他们的Trace数据类型分解为两个部分,基本仿函数f,然后是由f以下公式生成的免费monad :
-- The base functor
data TraceF x =
SYS_NBIO (IO x)
| SYS_FORK x x
| SYS_YIELD x
| SYS_RET
| SYS_EPOLL_WAIT FD EPOLL_EVENT x
-- You can even skip this definition if you use the GHC
-- "DerivingFunctor" extension
instance Functor TraceF where
fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x)
fmap f (SYS_FORK x) = SYS_FORK (f x) (f x)
fmap f (SYS_YIELD x) = SYS_YIELD (f x)
fmap f SYS_RET = SYS_RET
fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x)
Run Code Online (Sandbox Code Playgroud)
鉴于这个仿函数,你可以Trace"免费" 获得monad:
type Trace a = Free TraceF a
-- or: type Trace = Free TraceF
Run Code Online (Sandbox Code Playgroud)
...虽然这不是为什么它被称为"自由"monad.
然后更容易定义它们的所有功能:
liftF = Free . fmap Pure
-- if "Free f" is like a list of "f", then
-- this is sort of like: "liftF x = [x]"
-- it's just a convenience function
-- their definitions are written in continuation-passing style,
-- presumably for efficiency, but they are equivalent to these
sys_nbio io = liftF (SYS_NBIO io)
sys_fork t = SYS_FORK t (return ()) -- intentionally didn't use liftF
sys_yield = liftF (SYS_YIELD ())
sys_ret = liftF SYS_RET
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event ())
Run Code Online (Sandbox Code Playgroud)
那么你可以像monad一样使用这些命令:
myTrace fd event = do
sys_nbio (putStrLn "Hello, world")
fork $ do
sys_nbio (putStrLn "Hey")
sys_expoll_wait fd event
Run Code Online (Sandbox Code Playgroud)
现在,这是关键概念.我刚写的那个monad只创建了一个数据类型.而已.它根本不解释它.就像你为表达式编写抽象语法树一样.这完全取决于您如何评估它.在论文中,他们给出了表达式的解释器的具体示例,但编写自己的解释器是微不足道的.
重要的概念是这个解释器可以运行在你想要的任何monad中.因此,如果您想通过并发处理某些状态,则可以执行此操作.例如,这是一个玩具解释器,它使用StateT IO monad来跟踪IO动作被调用的次数:
interpret t = case t of
SYS_NBIO io -> do
modify (+1)
t' <- lift io
interpret t'
...
Run Code Online (Sandbox Code Playgroud)
您甚至可以跨forkIO的操作线程monad!这是我的一些非常古老的代码,它是有缺陷和跛脚的,因为它是在我经验不足并且不知道什么是免费的monad时被写回来的,但它在行动中证明了这一点:
module Thread (Thread(..), done, lift, branch, fork, run) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Cont
import Data.Sequence
import qualified Data.Foldable as F
data Thread f m =
Done
| Lift (m (Thread f m))
| LiftIO (IO (Thread f m))
| Branch (f (Thread f m))
| Exit
done = cont $ \c -> Done
lift' x = cont $ \c -> Lift $ liftM c x
liftIO' x = cont $ \c -> LiftIO $ liftM c x
branch x = cont $ \c -> Branch $ fmap c x
exit = cont $ \c -> Exit
fork x = join $ branch [return (), x >> done]
run x = do
q <- liftIO $ newTChanIO
enqueue q $ runCont x $ \_ -> Done
loop q
where
loop q = do
t <- liftIO $ atomically $ readTChan q
case t of
Exit -> return ()
Done -> loop q
Branch ft -> mapM_ (enqueue q) ft >> loop q
Lift mt -> (mt >>= enqueue q) >> loop q
LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q
enqueue q = liftIO . atomically . writeTChan q
Run Code Online (Sandbox Code Playgroud)
免费monad背后的一点是,他们提供monad实例和NOTHING ELSE.换句话说,他们退后一步,让你完全自由地想要解释它们,这就是为什么它们非常有用.
| 归档时间: |
|
| 查看次数: |
1536 次 |
| 最近记录: |