Dan*_*tin 11 haskell tcp haskell-snap-framework
我有一个基于Haskell的Web服务,它执行一些计算,对于某些输入可能需要很长时间才能完成.("真的很长"这里意味着超过一分钟)
因为执行该计算会占用服务器上所有可用的CPU,所以我将传入的请求放在一个队列中(好吧,实际上是一个堆栈,原因与典型的客户端有关,但除了这一点之外)当它们到达并为它们提供服务时当前正在运行的计算结束.
我的问题是,客户端并不总是等待足够长的时间,有时会超时,断开连接并尝试使用不同的服务器(好吧,他们再次尝试并点击elb,通常会获得不同的实例).此外,由于外部因素,Web客户端偶尔要求的计算将会过时,Web客户端将被终止.
在那些情况下,我真的希望能够在我将下一个请求从堆栈中拉出并开始(昂贵的)计算之前检测到Web客户端已经消失.不幸的是,我使用snap的经验让我相信在该框架中无法询问"客户端的TCP连接是否仍然连接?" 我还没有找到任何涵盖"客户端断开连接"案例的其他Web框架的文档.
那么是否有一个Haskell Web框架,可以很容易地检测Web客户端是否已断开连接?或者失败了,是否有一个至少使它成为可能?
(我知道在所有情况下都可能无法绝对确定TCP客户端是否仍在那里而不向另一端发送数据;但是,当客户端实际向服务器发送RST数据包而服务器的框架没有让应用程序代码确定连接已经消失,这是一个问题)
顺便说一下,虽然有人可能怀疑warp的 onClose处理程序会让你这样做,但只有当响应准备好并写入客户端时才会触发,因此作为中止正在进行的计算的方法是无用的.似乎也无法访问已接受的套接字以便设置SO_KEEPALIVE或类似.(有一些方法可以访问初始侦听套接字,但不是接受的套接字)
所以我找到了一个适合我的答案,也可能对其他人有用。
事实证明,您实际上可以对 Warp 的内部结构进行足够多的修改来做到这一点,但是您剩下的只是 Warp 的基本版本,如果您需要诸如日志记录等之类的东西,则需要添加其他内容包到此。
另请注意,所谓的“半关闭”连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为关闭,从而中断您的计算。我不知道有任何 HTTP 客户端处理半封闭连接,但只是需要注意一些事情。
无论如何,我所做的是首先复制函数runSettings并由和runSettingsSocket公开,并制作调用我提供的函数而不是 的版本,以便我拥有签名:Network.Wai.Handler.WarpNetwork.Wai.Handler.Warp.InternalWarpI.socketConnection
runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
-> Wai.Application -> IO ()
Run Code Online (Sandbox Code Playgroud)
这需要复制一些辅助方法,例如setSocketCloseOnExec和windowsThreadBlockHack。那里的双重IO签名可能看起来很奇怪,但这正是您想要的 - 外部IO在主线程(调用accept)中运行,内部在返回IO后分叉的每个连接线程中运行accept。原Warp函数runSettings等价于:
\set -> runSettings' set (WarpI.socketConnection >=> return . return)
Run Code Online (Sandbox Code Playgroud)
然后我做了:
data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared
runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
runSettings' set (WarpI.socketConnection >=> return . wrapConn)
where
-- Fork a 'monitor' thread that does nothing but attempt to
-- perform a read from conn in a loop 1/sec, and wrap the receive
-- methods on conn so that they first consume from the stuff read
-- by the monitoring thread. If the monitoring thread sees
-- end-of-file (signaled by an empty string read), raise
-- ClientDisappered on the per-connection thread.
wrapConn conn = do
tid <- myThreadId
nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
semaphore <- newMVar ()
readerCount <- newIORef (0 :: Int)
monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
return $ conn {
WarpI.connClose = throwTo monitorThread ClientDisappeared
>> WarpI.connClose conn
, WarpI.connRecv = newRecv nxtBstr semaphore readerCount
, WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
}
where
newRecv :: MVar ByteString -> MVar () -> IORef Int
-> IO ByteString
newRecv nxtBstr sem readerCount =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
case w of
Just w' -> return w'
Nothing -> WarpI.connRecv conn
)
newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
-> WarpI.Buffer -> WarpI.BufSize -> IO Bool
newRecvBuf nxtBstr sem readerCount buf bufSize =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do
(fulfilled, buf', bufSize') <-
if bufSize == 0 then return (False, buf, bufSize)
else
do w <- tryTakeMVar nxtBstr
case w of
Nothing -> return (False, buf, bufSize)
Just w' -> do
let wlen = B.length w'
if wlen > bufSize
then do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') bufSize
putMVar nxtBstr (B.drop bufSize w')
return (True, buf, 0)
else do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') wlen
return (wlen == bufSize, plusPtr buf wlen,
bufSize - wlen)
if fulfilled then return True
else WarpI.connRecvBuf conn buf' bufSize'
)
dropClientDisappeared :: ClientDisappeared -> IO ()
dropClientDisappeared _ = return ()
monitor tid nxtBstr sem st =
catch (monitor' tid nxtBstr sem st) dropClientDisappeared
monitor' tid nxtBstr sem st = do
(hitEOF, readerCount) <- withMVar sem $ \_ -> do
w <- tryTakeMVar nxtBstr
case w of
-- No one picked up our bytestring from last time
Just w' -> putMVar nxtBstr w' >> return (False, 0)
Nothing -> do
w <- WarpI.connRecv conn
putMVar nxtBstr w
readerCount <- readIORef st
return (B.null w, readerCount)
if hitEOF && (readerCount == 0)
-- Don't signal if main thread is also trying to read -
-- in that case, main thread will see EOF directly
then throwTo tid ClientDisappeared
else do threadDelay oneSecondInMicros
monitor' tid nxtBstr sem st
oneSecondInMicros = 1000000
Run Code Online (Sandbox Code Playgroud)