标签: conduit

不确定交错管道的来源

我希望看到源的非确定性交错操作,类型签名如

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)
Run Code Online (Sandbox Code Playgroud)

用例是我有一个p2p应用程序,它保持与网络上许多节点的开放连接,而且它主要是坐在等待来自其中任何节点的消息.当消息到达时,它不关心它来自何处,但需要尽快处理消息.理论上,这种应用程序(至少用于类似套接字的源)可以完全绕过GHC的IO管理器并运行select/ epoll/ etc.直接打电话,但我并不特别在意它是如何实现的,只要它有效.

管道可以这样吗?一种不那么通用但可能更可行的方法可能是编写一个[(k, Socket)] -> Source m (k, ByteString)处理所有套接字接收的函数.

我注意到ResumableSource管道中的操作,但它们似乎都想要了解一个特定的Sink,这感觉就像一个抽象泄漏,至少对于这个操作.

haskell conduit

5
推荐指数
1
解决办法
340
查看次数

使用管道的顺序二进制数据解码

目标是使用具有以下类型签名的管道

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
Run Code Online (Sandbox Code Playgroud)

管道应重复解析ByteString -> a通过TCP/IP(使用network-conduit包)接收的协议缓冲区(使用该功能).

有线消息格式是

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
Run Code Online (Sandbox Code Playgroud)

(花括号不是协议的一方,仅用于分隔实体).

第一个想法是使用sequenceSink重复应用Sink能够解析一个ProtoBuf:

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf …
Run Code Online (Sandbox Code Playgroud)

haskell protocol-buffers conduit

5
推荐指数
1
解决办法
613
查看次数

conduit和network-conduit:结合MonadResource和IO

我正在试验导管包.我还找到了network-conduit包,我试着建立一个简单的TCP客户端,它将文件的内容发送到套接字:

import Data.Conduit
import Data.Conduit.Binary
import Data.Conduit.Network
import Data.ByteString.Char8 (pack)

sendFile fileName appData = runResourceT $ 
   sourceFile fileName $$ appSink appData

main = runTCPClient (clientSettings 8000 (pack "localhost")) (sendFile "book.tex")
Run Code Online (Sandbox Code Playgroud)

但是,这不起作用,因为app sink不属于ResourceT:

[1 of 1] Compiling Main             ( Conduit2.hs, interpreted )

Conduit2.hs:9:63:
    Occurs check: cannot construct the infinite type: m0 = ResourceT m0
    Expected type: Application (ResourceT m0)
      Actual type: AppData (ResourceT m0) -> m0 ()
    In the return type of a call of `sendFile'
    In the second argument of …
Run Code Online (Sandbox Code Playgroud)

haskell conduit

5
推荐指数
1
解决办法
333
查看次数

管道流动中的并行处理

我非常喜欢管道/管道的概念,用于将操作应用于流式IO源.我感兴趣的是构建适用于非常大的日志文件的工具.从Python/Ruby迁移到Haskell的一个吸引人的地方是编写并行代码的简单方法,但我找不到任何这方面的文档.我怎么能设置一个管道流来读取文件中的行并且并行处理它们(即有8个核,它应该读取8行,然后将它们移交给8个不同的线程进行处理,然后再次收集等),理想情况下尽可能少的"仪式"......

可选地,可以注意线是否需要按顺序重新加入,如果这可能影响过程的速度?

我确信可以使用Parallel Haskell书中的想法自己拼凑一些东西,但在我看来,在Conduit工作流程中间并行(parmap等)运行纯函数应该非常简单?

haskell conduit

5
推荐指数
1
解决办法
997
查看次数

使用Conduit从ByteString过滤ANSI转义序列

我正在尝试制作一个从ByteStrings过滤ANSI转义码的Conduit.我想出了一个函数,它将ByteString转换为Word8的流,进行过滤,并在最后转换回ByteStream流.

当我在GHCi中使用它时似乎工作正常:

> runConduit $ yield "hello\27[23;1m world" .| ansiFilter .| printC
"hello world"
Run Code Online (Sandbox Code Playgroud)

当我在我的应用程序中使用它时,包含的管道ansiFilter似乎没有通过任何东西.这是完整的来源:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Conduit
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Conduit.TQueue
import Data.Word8 (Word8)
import qualified Data.Word8 as Word8

main :: IO ()
main = do

      queue <- atomically $ newTBQueue 25
      let qSource = sourceTBQueue queue
      atomically $ writeTBQueue queue ("hello" :: ByteString)

      race_
        (putInputIntoQueue queue)
        (doConversionAndPrint qSource)

putInputIntoQueue q …
Run Code Online (Sandbox Code Playgroud)

haskell conduit

5
推荐指数
1
解决办法
195
查看次数

为什么导管和管道不能具有Arrow实例?

在reddit上有一个存档的线程,该线程表示管道基本上不能是箭头b / c箭头需要同步。该线程链接在这里https://www.reddit.com/r/haskell/comments/rq1q5/conduitssinks_and_refactoring_arrows/

我看不到“同步”出现的地方,因为这不是箭头定义的一部分。另外,我在github https://github.com/cmahon/interactive-brokers上偶然发现了这个项目,该项目将管道明确地视为箭头。为了方便起见,我在此处粘贴实例def。我在这里想念什么?

-- The code in this module was provided by Gabriel Gonzalez

{-# LANGUAGE RankNTypes #-}

module Pipes.Edge where

import           Control.Arrow
import           Control.Category (Category((.), id))
import           Control.Monad ((>=>))
import           Control.Monad.Trans.State.Strict (get, put)
import           Pipes
import           Pipes.Core (request, respond, (\>\), (/>/), push, (>~>))
import           Pipes.Internal (unsafeHoist)
import           Pipes.Lift (evalStateP)
import           Prelude hiding ((.), id)

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad …
Run Code Online (Sandbox Code Playgroud)

haskell arrows conduit haskell-pipes

5
推荐指数
1
解决办法
370
查看次数

Haskell的FreeT和协程类型之间有什么关系

Monad.Reader第19期中的“协程管道”文章中,作者定义了一个泛型Coroutine类型:

newtype Coroutine f m a = Coroutine
  { resume :: m (Either (f (Coroutine f m a)) a)
  }
Run Code Online (Sandbox Code Playgroud)

我注意到这种类型FreeTfree包中的类型非常相似:

data FreeF f a b = Pure a | Free (f b)

newtype FreeT f m a = FreeT
  { runFreeT :: m (FreeF f a (FreeT f m a))
  }
Run Code Online (Sandbox Code Playgroud)

看来FreeTCoroutine是同构的。这是从一个映射到另一个的函数:

freeTToCoroutine
  :: forall f m a. (Functor f, Functor m) => FreeT f m a …
Run Code Online (Sandbox Code Playgroud)

haskell coroutine conduit free-monad haskell-pipes

5
推荐指数
1
解决办法
350
查看次数

导管管道执行时间奇怪

我正在测试此递归Haskell函数的性能,该函数重复累加一个无限列表的前100000000个整数(使用Conduit管道),并打印每次执行的经过时间:

import Conduit
import Data.Time.Clock

evaluate_listC 0 = return ()
evaluate_listC i = do
    startTime <- getCurrentTime
    print $ runConduitPure $ yieldMany [1..] .| takeC 100000000 .| sumC
    endTime <- getCurrentTime
    print $ diffUTCTime endTime startTime
    evaluate_listC (i-1)
Run Code Online (Sandbox Code Playgroud)

编译(带有-O标志)并运行代码,并对该函数进行10次迭代,我获得以下执行时间:

38.2066878s
4.3696857s
1.3367605s
0.9950032s
0.9399968s
0.9039936s
0.9079987s
0.9119587s
0.9090151s
0.8749654s
Run Code Online (Sandbox Code Playgroud)

为什么第一次迭代(以及第二次迭代)要花费更多的时间,而随后的迭代却要快得多?

haskell conduit

5
推荐指数
0
解决办法
79
查看次数

如何在使用HTTP管道时干净利落地处理所有可能的错误?

我有一些看起来像这样的代码:

retryOnTimeout :: IO a -> IO a
retryOnTimeout action = catch action $ \ResponseTimeout -> 
                            do putStrLn "Timed out. Trying again."
                               threadDelay 5000000
                               action 
Run Code Online (Sandbox Code Playgroud)

问题是HttpException还有很多其他的构造函数,我想一般再试一次,不管错误到底是什么.现在,如果我替换ResponseTimeout_那么我将得到一个编译错误,因为它不能推断出异常的类型.

我真的不想为异常处理程序提供类型签名.

我知道这不是太多的重复,而是增加了对的情况下_,因为它好像是说觉得不妥:如果异常是ResponseTimeout然后做X,但如果异常是什么都做同样的事情.是否有一种使用通配符的简洁方法,但仍然让编译器知道它是哪种类型?

haskell conduit

4
推荐指数
1
解决办法
355
查看次数

使用postgresql-simple创建流媒体管道源

postgresql-simple 提供用于流式查询的功能,例如

fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
Run Code Online (Sandbox Code Playgroud)

我想创建一个充分利用流媒体的管道源.

mySource :: (FromRow row, Monad m) => Source m row
Run Code Online (Sandbox Code Playgroud)

不幸的是,因为IO出现在一个逆变的位置(我认为?)fold,我真的很难与这些类型斗争.以下类型检查,但在产生值之前折叠整个流.

getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt …
Run Code Online (Sandbox Code Playgroud)

haskell conduit postgresql-simple

4
推荐指数
1
解决办法
321
查看次数