在Haskell中,如何在Web客户端断开连接时中止计算

Dan*_*tin 11 haskell tcp haskell-snap-framework

我有一个基于Haskell的Web服务,它执行一些计算,对于某些输入可能需要很长时间才能完成.("真的很长"这里意味着超过一分钟)

因为执行该计算会占用服务器上所有可用的CPU,所以我将传入的请求放在一个队列中(好吧,实际上是一个堆栈,原因与典型的客户端有关,但除了这一点之外)当它们到达并为它们提供服务时当前正在运行的计算结束.

我的问题是,客户端并不总是等待足够长的时间,有时会超时,断开连接并尝试使用不同的服务器(好吧,他们再次尝试并点击elb,通常会获得不同的实例).此外,由于外部因素,Web客户端偶尔要求的计算将会过时,Web客户端将被终止.

在那些情况下,我真的希望能够在我将下一个请求从堆栈中拉出并开始(昂贵的)计算之前检测到Web客户端已经消失.不幸的是,我使用snap的经验让我相信在该框架中无法询问"客户端的TCP连接是否仍然连接?" 我还没有找到任何涵盖"客户端断开连接"案例的其他Web框架的文档.

那么是否有一个Haskell Web框架,可以很容易地检测Web客户端是否已断开连接?或者失败了,是否有一个至少使它成为可能?

(我知道在所有情况下都可能无法绝对确定TCP客户端是否仍在那里而不向另一端发送数据;但是,当客户端实际向服务器发送RST数据包而服务器的框架没有让应用程序代码确定连接已经消失,这是一个问题)


顺便说一下,虽然有人可能怀疑warp的 onClose处理程序会让你这样做,但只有当响应准备好并写入客户端时才会触发,因此作为中止正在进行的计算的方法是无用的.似乎也无法访问已接受的套接字以便设置SO_KEEPALIVE或类似.(有一些方法可以访问初始侦听套接字,但不是接受的套接字)

Dan*_*tin 3

所以我找到了一个适合我的答案,也可能对其他人有用。

事实证明,您实际上可以对 Warp 的内部结构进行足够多的修改来做到这一点,但是您剩下的只是 Warp 的基本版本,如果您需要诸如日志记录等之类的东西,则需要添加其他内容包到此。

另请注意,所谓的“半关闭”连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为关闭,从而中断您的计算。我不知道有任何 HTTP 客户端处理半封闭连接,但只是需要注意一些事情。

无论如何,我所做的是首先复制函数runSettings并由和runSettingsSocket公开,并制作调用我提供的函数而不是 的版本,以便我拥有签名:Network.Wai.Handler.WarpNetwork.Wai.Handler.Warp.InternalWarpI.socketConnection

runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
             -> Wai.Application -> IO ()
Run Code Online (Sandbox Code Playgroud)

这需要复制一些辅助方法,例如setSocketCloseOnExecwindowsThreadBlockHack。那里的双重IO签名可能看起来很奇怪,但这正是您想要的 - 外部IO在主线程(调用accept)中运行,内部在返回IO后分叉的每个连接线程中运行accept。原Warp函数runSettings等价于:

\set -> runSettings' set (WarpI.socketConnection >=> return . return)
Run Code Online (Sandbox Code Playgroud)

然后我做了:

data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared

runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
  runSettings' set (WarpI.socketConnection >=> return . wrapConn)
  where
    -- Fork a 'monitor' thread that does nothing but attempt to
    -- perform a read from conn in a loop 1/sec, and wrap the receive
    -- methods on conn so that they first consume from the stuff read
    -- by the monitoring thread. If the monitoring thread sees
    -- end-of-file (signaled by an empty string read), raise
    -- ClientDisappered on the per-connection thread.
    wrapConn conn = do
      tid <- myThreadId
      nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
      semaphore <- newMVar ()
      readerCount <- newIORef (0 :: Int)
      monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
      return $ conn {
        WarpI.connClose = throwTo monitorThread ClientDisappeared
                          >> WarpI.connClose conn
        , WarpI.connRecv = newRecv nxtBstr semaphore readerCount
        , WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
        }
      where
        newRecv :: MVar ByteString -> MVar () -> IORef Int
                -> IO ByteString
        newRecv nxtBstr sem readerCount =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
                                   case w of
                                     Just w' -> return w'
                                     Nothing -> WarpI.connRecv conn
          )

        newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
                   -> WarpI.Buffer -> WarpI.BufSize -> IO Bool
        newRecvBuf nxtBstr sem readerCount buf bufSize =
          bracket_
          (atomicModifyIORef' readerCount $ \x -> (succ x, ()))
          (atomicModifyIORef' readerCount $ \x -> (pred x, ()))
          (withMVar sem $ \_ -> do
              (fulfilled, buf', bufSize') <-
                if bufSize == 0 then return (False, buf, bufSize)
                else
                  do w <- tryTakeMVar nxtBstr
                     case w of
                       Nothing -> return (False, buf, bufSize)
                       Just w' -> do
                         let wlen = B.length w'
                         if wlen > bufSize
                           then do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') bufSize
                                   putMVar nxtBstr (B.drop bufSize w')
                                   return (True, buf, 0)
                           else do BU.unsafeUseAsCString w' $ \cw' ->
                                     copyBytes buf (castPtr cw') wlen
                                   return (wlen == bufSize, plusPtr buf wlen,
                                           bufSize - wlen)
              if fulfilled then return True
                else WarpI.connRecvBuf conn buf' bufSize'
          )
        dropClientDisappeared :: ClientDisappeared -> IO ()
        dropClientDisappeared _ = return ()
        monitor tid nxtBstr sem st =
          catch (monitor' tid nxtBstr sem st) dropClientDisappeared

        monitor' tid nxtBstr sem st = do
          (hitEOF, readerCount) <- withMVar sem $ \_ -> do
            w <- tryTakeMVar nxtBstr
            case w of
              -- No one picked up our bytestring from last time
              Just w' -> putMVar nxtBstr w' >> return (False, 0)
              Nothing -> do
                w <- WarpI.connRecv conn
                putMVar nxtBstr w
                readerCount <- readIORef st
                return (B.null w, readerCount)
          if hitEOF && (readerCount == 0)
            -- Don't signal if main thread is also trying to read -
            -- in that case, main thread will see EOF directly
            then throwTo tid ClientDisappeared
            else do threadDelay oneSecondInMicros
                    monitor' tid nxtBstr sem st
        oneSecondInMicros = 1000000
Run Code Online (Sandbox Code Playgroud)