我见过人们为各种懒惰的IO相关任务推荐管道/管道库.这些库到底解决了什么问题?
此外,当我尝试使用一些与hackage相关的库时,很可能有三个不同的版本.例:
这让我很困惑.对于我的解析任务,我应该使用attoparsec或pipes-attoparsec/attoparsec-conduit?与普通香草attoparsec相比,管道/导管版本给我带来了什么好处?
问题
你好!我正在编写一个日志库,我很乐意创建一个在单独的线程中运行的记录器,而所有应用程序线程都只是向它发送消息.我想为这个问题找到性能最佳的解决方案.我在这里需要简单的unboud队列.
途径
我已经创建了一些测试来查看可用解决方案的执行情况,我在这里得到了非常奇怪的结果.我测试了4个实现(下面提供的源代码)基于:
测试
以下是用于测试的源代码:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num …
Run Code Online (Sandbox Code Playgroud) 在我看来,这两个想法之间存在着密切的联系.我的猜测是,如果有一种方法可以用迭代器表示任意图形,那么FRP可以用Iteratees来实现.但是afaik他们只支持链式结构.
有人可以对此有所了解吗?
我试图了解导管和管道之间的差异.与管道不同,管道具有剩余物的概念.什么是剩菜有用?我想看一些剩菜必不可少的例子.
由于管道没有剩余的概念,有没有办法与它们实现类似的行为?
假设我有简单的生产者/消费者模型,消费者想要将一些状态传递给生产者.例如,让下游流动的对象成为我们想要写入文件的对象,上游对象是表示在文件中写入对象的位置的一些标记(例如,偏移).
这两个过程可能看起来像这样(带pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object …
Run Code Online (Sandbox Code Playgroud) 我正在看一下用于流处理的管道3.0包.该教程非常好,非常清楚,除了我无法绕过"zip和merge"部分.
我的目标是结合管道有点像ArrowChoice允许做:
+----------+ +------+ - filterLeft -> pipe1 -> +------------+
| producer | - (Either a a) -> | fork | | mergeD (?) |
+----------+ +------+ - filterRight -> pipe2 -> +------------+
Run Code Online (Sandbox Code Playgroud)
我fork
在教程中定义:
fork () =
runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do
a <- request ()
lift $ respond a
lift $ lift $ respond a
oddOrEven x = if odd x then Left x else …
Run Code Online (Sandbox Code Playgroud) A Pipe
可以分为两部分:生成器部分(yield
)和消费者部分(await
).
如果你有一个Pipe
只使用它的生成器一半,并且只返回()
(或永不返回),那么它可以表示为" ListT
完成权限".事实证明,MonadPlus
它可以用来代表像ListT-done-right这样的东西.
http://www.reddit.com/r/haskell/comments/2bpsh7/a_simple_monadic_stream_library/cj7sqtw?context=3
所以我的问题是:对于Pipes的消费者部分,ListT和MonadPlus是否存在双重性?
要求:
yield
,只返回()
(或永不返回)但确实使用的管道await
可以表示为"对ListT"的双重.我试图理解管道概念的不同实现之间的差异.导管和管道之间的区别之一是它们如何将管道熔合在一起.管道有
(>+>) :: Monad m
=> Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
Run Code Online (Sandbox Code Playgroud)
而管有
(>->) :: (Monad m, Proxy p)
=> (b' -> p a' a b' b m r) -> (c' -> p b' b c' c m r) -> c' -> p a' a c' c m r
Run Code Online (Sandbox Code Playgroud)
如果我理解正确,使用管道,当两个管道的任何管道停止时,返回其结果而停止另一个.使用导管,如果左侧管道完成,其结果将向下游发送到右侧管道.
我想知道, …
以下程序的存储器分析表明,noleak函数在常量存储器中运行,而泄漏函数以线性方式泄漏存储器.dflemstr表明这可能是由于RWST导致无限的分配链.是这种情况还有其他解决方案吗?我实际上不需要Writer monad.
环境:
ARCH 64位GHC 7.8.3
ghc Pipe.hs -o Pipe -prof
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import Control.Monad.Trans.RWS.Strict
main = leak
effectLeak :: Effect (RWST () () () IO) ()
effectLeak =
(forever $ do
liftIO . threadDelay $ 10000 * 1
yield "Space") >->
(forever $ do
text <- await
yield $ text ++ (" leak" :: String)) >->
(forever $ do
text <- await
liftIO . print $ text
)
effectNoleak :: Effect IO () …
Run Code Online (Sandbox Code Playgroud) 我有一个Producer
创建依赖于随机性的值,使用我自己的Random
monad:
policies :: Producer (Policy s a) Random x
Run Code Online (Sandbox Code Playgroud)
Random
是一个mwc-random
可以从ST
或运行的包装器IO
:
newtype Random a =
Random (forall m. PrimMonad m => Gen (PrimState m) -> m a)
runIO :: Random a -> IO a
runIO (Random r) = MWC.withSystemRandom (r @ IO)
Run Code Online (Sandbox Code Playgroud)
该policies
生产商的产量从一个简单的强化学习算法好政策.
通过索引到policies
以下内容,我可以在5,000,000次迭代后有效地绘制策略:
Just convergedPolicy <- Random.runIO $ Pipes.index 5000000 policies
plotPolicy convergedPolicy "policy.svg"
Run Code Online (Sandbox Code Playgroud)
我现在想要在每500,000步骤上绘制中间策略,以了解它们如何收敛.我写了几个函数,它们接受policies
生产者并提取一个列表([Policy s a]
),例如10个策略 - 每500,000次迭代一次 - 然后绘制所有这些函数. …
haskell ×10
haskell-pipes ×10
conduit ×4
iterate ×2
concurrency ×1
frp ×1
pipe ×1
profiling ×1
stm ×1