maj*_*zak 5 concurrency haskell mqtt event-stream-processing
我想处理通过 MQTT 接收到的事件流。我正在使用的库使用回调来提供结果。我正在做的处理取决于以前的状态,而不仅仅是最新的事件。此外,未来的事件可能会从其他来源收集。
一开始我决定把它组合成一个听起来是个好主意的清单。我有一个小问题,因为 IO 阻止了延迟评估并且等待无限流可能很长,但我用交错 IO 解决了它。
stream :: IO [Event]允许我做的不错的东西一样foldl,foldM map,mapM,等...不幸的是这种方法我宁愿将无法在两个流合并,原因是没有更多的锁定功能存在。
我正在挖掘许多库,例如找到了带有 TQueue 的 STM。不幸的是,这不是我真正想要的。
我决定创建自定义类型并制作它,Foldable以便我能够折叠它。由于 IO,我失败了。
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
Run Code Online (Sandbox Code Playgroud)
可以像这样使用 main = foldStream (\x _ -> print x) () =<< events
是否可以像使用常规列表一样实现一些基类来处理此流?
在这些情况下,通常的技巧是将回调写入队列,然后从队列的另一端读取。
\n\n使用stm-chans包中的有界、可关闭队列,我们可以定义此函数:
\n\nimport Control.Concurrent.STM\nimport Control.Concurrent.STM.TBMQueue\n\nfoldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b\nfoldQueue queue step start done =\n let go state = \n do m <- atomically (readTBMQueue queue)\n case m of \n Nothing -> done state\n Just a -> step state a >>= go\n in start >>= go\nRun Code Online (Sandbox Code Playgroud)\n\n它需要通道、一个步骤函数(类似于 所需的函数foldM)、一个获取初始状态的操作以及一个返回最终结果的“完成”操作,然后从通道提供数据,直到通道关闭。请注意,折叠状态x是由 的调用者选择的foldQueue。
如果以后我们想从foldl包\xe2\x80\x94升级到monadic Folds,它有一个非常有用的Applicative实例\xe2\x80\x94,我们可以这样做:
import qualified Control.Foldl as L\n\nfoldQueue\' :: TBMQueue a -> L.FoldM IO a b -> IO b \nfoldQueue\' queue = L.impurely (foldQueue queue)\nRun Code Online (Sandbox Code Playgroud)\n\nimpurely从“foldl”包中使用。
有时(例如解析、分组或解码时)使用基于拉取的消费者会更容易。我们可以使用流媒体包来做到这一点:
\n\nimport Streaming\nimport qualified Streaming.Prelude as S\n\nfoldQueue\' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r\nfoldQueue\' queue consume = consume (S.untilRight (do\n m <- atomically (readTBMQueue queue)\n return (case m of\n Nothing -> Right ()\n Just a -> Left a)))\nRun Code Online (Sandbox Code Playgroud)\n\n给定一个使用流的函数,我们向它提供从队列中读取的值流。
\n\n通常,从通道读取数据和向通道写入数据必须在不同的线程中进行。我们可以使用asyncconcurrently之类的函数来干净地处理它。
| 归档时间: |
|
| 查看次数: |
323 次 |
| 最近记录: |