我希望看到源的非确定性交错操作,类型签名如
interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)
Run Code Online (Sandbox Code Playgroud)
用例是我有一个p2p应用程序,它保持与网络上许多节点的开放连接,而且它主要是坐在等待来自其中任何节点的消息.当消息到达时,它不关心它来自何处,但需要尽快处理消息.理论上,这种应用程序(至少用于类似套接字的源)可以完全绕过GHC的IO管理器并运行select/ epoll/ etc.直接打电话,但我并不特别在意它是如何实现的,只要它有效.
管道可以这样吗?一种不那么通用但可能更可行的方法可能是编写一个[(k, Socket)] -> Source m (k, ByteString)处理所有套接字接收的函数.
我注意到ResumableSource管道中的操作,但它们似乎都想要了解一个特定的Sink,这感觉就像一个抽象泄漏,至少对于这个操作.
目标是使用具有以下类型签名的管道
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
Run Code Online (Sandbox Code Playgroud)
管道应重复解析ByteString -> a通过TCP/IP(使用network-conduit包)接收的协议缓冲区(使用该功能).
有线消息格式是
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
Run Code Online (Sandbox Code Playgroud)
(花括号不是协议的一方,仅用于分隔实体).
第一个想法是使用sequenceSink重复应用Sink能够解析一个ProtoBuf:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf …Run Code Online (Sandbox Code Playgroud) 我正在试验导管包.我还找到了network-conduit包,我试着建立一个简单的TCP客户端,它将文件的内容发送到套接字:
import Data.Conduit
import Data.Conduit.Binary
import Data.Conduit.Network
import Data.ByteString.Char8 (pack)
sendFile fileName appData = runResourceT $
sourceFile fileName $$ appSink appData
main = runTCPClient (clientSettings 8000 (pack "localhost")) (sendFile "book.tex")
Run Code Online (Sandbox Code Playgroud)
但是,这不起作用,因为app sink不属于ResourceT:
[1 of 1] Compiling Main ( Conduit2.hs, interpreted )
Conduit2.hs:9:63:
Occurs check: cannot construct the infinite type: m0 = ResourceT m0
Expected type: Application (ResourceT m0)
Actual type: AppData (ResourceT m0) -> m0 ()
In the return type of a call of `sendFile'
In the second argument of …Run Code Online (Sandbox Code Playgroud) 我非常喜欢管道/管道的概念,用于将操作应用于流式IO源.我感兴趣的是构建适用于非常大的日志文件的工具.从Python/Ruby迁移到Haskell的一个吸引人的地方是编写并行代码的简单方法,但我找不到任何这方面的文档.我怎么能设置一个管道流来读取文件中的行并且并行处理它们(即有8个核,它应该读取8行,然后将它们移交给8个不同的线程进行处理,然后再次收集等),理想情况下尽可能少的"仪式"......
可选地,可以注意线是否需要按顺序重新加入,如果这可能影响过程的速度?
我确信可以使用Parallel Haskell书中的想法自己拼凑一些东西,但在我看来,在Conduit工作流程中间并行(parmap等)运行纯函数应该非常简单?
我正在尝试制作一个从ByteStrings过滤ANSI转义码的Conduit.我想出了一个函数,它将ByteString转换为Word8的流,进行过滤,并在最后转换回ByteStream流.
当我在GHCi中使用它时似乎工作正常:
> runConduit $ yield "hello\27[23;1m world" .| ansiFilter .| printC
"hello world"
Run Code Online (Sandbox Code Playgroud)
当我在我的应用程序中使用它时,包含的管道ansiFilter似乎没有通过任何东西.这是完整的来源:
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Conduit
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Conduit.TQueue
import Data.Word8 (Word8)
import qualified Data.Word8 as Word8
main :: IO ()
main = do
queue <- atomically $ newTBQueue 25
let qSource = sourceTBQueue queue
atomically $ writeTBQueue queue ("hello" :: ByteString)
race_
(putInputIntoQueue queue)
(doConversionAndPrint qSource)
putInputIntoQueue q …Run Code Online (Sandbox Code Playgroud) 在reddit上有一个存档的线程,该线程表示管道基本上不能是箭头b / c箭头需要同步。该线程链接在这里https://www.reddit.com/r/haskell/comments/rq1q5/conduitssinks_and_refactoring_arrows/
我看不到“同步”出现的地方,因为这不是箭头定义的一部分。另外,我在github https://github.com/cmahon/interactive-brokers上偶然发现了这个项目,该项目将管道明确地视为箭头。为了方便起见,我在此处粘贴实例def。我在这里想念什么?
-- The code in this module was provided by Gabriel Gonzalez
{-# LANGUAGE RankNTypes #-}
module Pipes.Edge where
import Control.Arrow
import Control.Category (Category((.), id))
import Control.Monad ((>=>))
import Control.Monad.Trans.State.Strict (get, put)
import Pipes
import Pipes.Core (request, respond, (\>\), (/>/), push, (>~>))
import Pipes.Internal (unsafeHoist)
import Pipes.Lift (evalStateP)
import Prelude hiding ((.), id)
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
instance (Monad …Run Code Online (Sandbox Code Playgroud) 在Monad.Reader第19期中的“协程管道”文章中,作者定义了一个泛型Coroutine类型:
newtype Coroutine f m a = Coroutine
{ resume :: m (Either (f (Coroutine f m a)) a)
}
Run Code Online (Sandbox Code Playgroud)
data FreeF f a b = Pure a | Free (f b)
newtype FreeT f m a = FreeT
{ runFreeT :: m (FreeF f a (FreeT f m a))
}
Run Code Online (Sandbox Code Playgroud)
看来FreeT和Coroutine是同构的。这是从一个映射到另一个的函数:
freeTToCoroutine
:: forall f m a. (Functor f, Functor m) => FreeT f m a …Run Code Online (Sandbox Code Playgroud) 我正在测试此递归Haskell函数的性能,该函数重复累加一个无限列表的前100000000个整数(使用Conduit管道),并打印每次执行的经过时间:
import Conduit
import Data.Time.Clock
evaluate_listC 0 = return ()
evaluate_listC i = do
startTime <- getCurrentTime
print $ runConduitPure $ yieldMany [1..] .| takeC 100000000 .| sumC
endTime <- getCurrentTime
print $ diffUTCTime endTime startTime
evaluate_listC (i-1)
Run Code Online (Sandbox Code Playgroud)
编译(带有-O标志)并运行代码,并对该函数进行10次迭代,我获得以下执行时间:
38.2066878s
4.3696857s
1.3367605s
0.9950032s
0.9399968s
0.9039936s
0.9079987s
0.9119587s
0.9090151s
0.8749654s
Run Code Online (Sandbox Code Playgroud)
为什么第一次迭代(以及第二次迭代)要花费更多的时间,而随后的迭代却要快得多?
我有一些看起来像这样的代码:
retryOnTimeout :: IO a -> IO a
retryOnTimeout action = catch action $ \ResponseTimeout ->
do putStrLn "Timed out. Trying again."
threadDelay 5000000
action
Run Code Online (Sandbox Code Playgroud)
问题是HttpException还有很多其他的构造函数,我想一般再试一次,不管错误到底是什么.现在,如果我替换ResponseTimeout为_那么我将得到一个编译错误,因为它不能推断出异常的类型.
我真的不想为异常处理程序提供类型签名.
我知道这不是太多的重复,而是增加了对的情况下_,因为它好像是说觉得不妥:如果异常是ResponseTimeout然后做X,但如果异常是什么都做同样的事情.是否有一种使用通配符的简洁方法,但仍然让编译器知道它是哪种类型?
postgresql-simple 提供用于流式查询的功能,例如
fold
:: (FromRow row, ToRow params)
=> Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
Run Code Online (Sandbox Code Playgroud)
我想创建一个充分利用流媒体的管道源.
mySource :: (FromRow row, Monad m) => Source m row
Run Code Online (Sandbox Code Playgroud)
不幸的是,因为IO出现在一个逆变的位置(我认为?)fold,我真的很难与这些类型斗争.以下类型检查,但在产生值之前折叠整个流.
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
where
foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
foo cond evt …Run Code Online (Sandbox Code Playgroud)