在 Haskell 中处理事件流

maj*_*zak 5 concurrency haskell mqtt event-stream-processing

我想处理通过 MQTT 接收到的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于以前的状态,而不仅仅是最新的事件。此外,未来的事件可能会从其他来源收集。

一开始我决定把它组合成一个听起来是个好主意的清单。我有一个小问题,因为 IO 阻止了延迟评估并且等待无限流可能很长,但我用交错 IO 解决了它。

stream :: IO [Event]允许我做的不错的东西一样foldlfoldM mapmapM,等...不幸的是这种方法我宁愿将无法在两个流合并,原因是没有更多的锁定功能存在。

我正在挖掘许多库,例如找到了带有 TQueue 的 STM。不幸的是,这不是我真正想要的。

我决定创建自定义类型并制作它,Foldable以便我能够折叠它。由于 IO,我失败了。

import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined
Run Code Online (Sandbox Code Playgroud)

可以像这样使用 main = foldStream (\x _ -> print x) () =<< events

是否可以像使用常规列表一样实现一些基类来处理此流?

dan*_*iaz 4

在这些情况下,通常的技巧是将回调写入队列,然后从队列的另一端读取。

\n\n

使用stm-chans包中的有界、可关闭队列,我们​​可以定义此函数:

\n\n
import Control.Concurrent.STM\nimport Control.Concurrent.STM.TBMQueue\n\nfoldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b\nfoldQueue queue step start done =\n    let go state = \n            do m <- atomically (readTBMQueue queue)\n               case m of \n                   Nothing -> done state\n                   Just a  -> step state a >>= go\n     in start >>= go\n
Run Code Online (Sandbox Code Playgroud)\n\n

它需要通道、一个步骤函数(类似于 所需的函数foldM)、一个获取初始状态的操作以及一个返回最终结果的“完成”操作,然后从通道提供数据,直到通道关闭。请注意,折叠状态x是由 的调用者选择的foldQueue

\n\n

如果以后我们想从foldl包\xe2\x80\x94升级到monadic Folds,它有一个非常有用的Applicative实例\xe2\x80\x94,我们可以这样做:

\n\n
import qualified Control.Foldl as L\n\nfoldQueue\' :: TBMQueue a -> L.FoldM IO a b -> IO b \nfoldQueue\' queue = L.impurely (foldQueue queue)\n
Run Code Online (Sandbox Code Playgroud)\n\n

impurely从“foldl”包中使用。

\n\n

有时(例如解析、分组或解码时)使用基于拉取的消费者会更容易。我们可以使用流媒体包来做到这一点:

\n\n
import Streaming\nimport qualified Streaming.Prelude as S\n\nfoldQueue\' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r\nfoldQueue\' queue consume = consume (S.untilRight (do\n    m <- atomically (readTBMQueue queue)\n    return (case m of\n        Nothing -> Right ()\n        Just a -> Left a)))\n
Run Code Online (Sandbox Code Playgroud)\n\n

给定一个使用流的函数,我们向它提供从队列中读取的值流。

\n\n

通常,从通道读取数据和向通道写入数据必须在不同的线程中进行。我们可以使用asyncconcurrently之类的函数来干净地处理它。

\n