标签: iterate

Scalaz迭代:"提升"`EnumeratorT`以匹配`IterateeT`为"更大"的monad

如果我有一个EnumeratorT和一个相应的IterateeT我可以一起运行它们:

val en: EnumeratorT[String, Task] = EnumeratorT.enumList(List("a", "b", "c"))
val it: IterateeT[String, Task, Int] = IterateeT.length

(it &= en).run : Task[Int]
Run Code Online (Sandbox Code Playgroud)

如果枚举器monad比iteratee monad"更大",我可以使用up或者更一般Hoist地"提升"iteratee以匹配:

val en: EnumeratorT[String, Task] = ...
val it: IterateeT[String, Id, Int] = ...

val liftedIt = IterateeT.IterateeTMonadTrans[String].hoist(
  implicitly[Task |>=| Id]).apply(it)
(liftedIt &= en).run: Task[Int]
Run Code Online (Sandbox Code Playgroud)

但是,当iteratee monad比枚举器monad"更大"时,我该怎么办?

val en: EnumeratorT[String, Id] = ...
val it: IterateeT[String, Task, Int] = ...

it &= ???
Run Code Online (Sandbox Code Playgroud)

似乎没有任何Hoist实例EnumeratorT,也没有任何明显的"提升"方法.

monads scala enumerator scalaz iterate

445
推荐指数
1
解决办法
1万
查看次数

使用Scalaz 7 zipWithIndex/group枚举避免内存泄漏

背景

正如在这个问题中所指出的,我正在使用Scalaz 7迭代器来处理恒定堆空间中的大型(即无界)数据流.

我的代码看起来像这样:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))
Run Code Online (Sandbox Code Playgroud)

问题

我似乎遇到了内存泄漏,但我对Scalaz/FP不太熟悉,不知道这个bug是在Scalaz中还是在我的代码中.直观地说,我希望这段代码只需要(大约为)P倍的Chunk-size空间.

注:我发现了一个类似的问题,其中OutOfMemoryError遇到,但我的代码没有使用consume.

测试

我跑了一些测试试图找出问题.总而言之,只有在使用zipWithIndex和泄漏时才会出现泄漏group.

// no zipping/grouping …
Run Code Online (Sandbox Code Playgroud)

scala scalaz iterate

106
推荐指数
1
解决办法
3354
查看次数

使用Scalaz Stream解析任务(替换Scalaz Iteratees)

介绍

我在许多项目中使用Scalaz 7的迭代,主要用于处理大型文件.我想开始切换到Scalaz ,这些旨在取代iteratee包(坦率地说它缺少很多部分并且使用起来很麻烦).

Streams基于机器(iteratee理念的另一种变体),它也已在Haskell中实现.我已经使用了Haskell机器库,但机器和流之间的关系并不是完全明显的(至少对我来说),并且流库的文档仍然有点稀疏.

这个问题是关于一个简单的解析任务,我希望看到使用流而不是迭代来实现.如果没有其他人能打败我,我会自己回答这个问题,但我确信我不是唯一一个正在(甚至考虑)这种转变的人,因为无论如何我需要完成这项工作,想我不妨在公共场合做这件事.

任务

假设我有一个包含已被标记化并用词性标记的句子的文件:

no UH
, ,
it PRP
was VBD
n't RB
monday NNP
. .

the DT
equity NN
market NN
was VBD
illiquid JJ
. .
Run Code Online (Sandbox Code Playgroud)

每行有一个标记,单词和词性由单个空格分隔,空白行表示句子边界.我想解析这个文件并返回一个句子列表,我们也可以将它们表示为字符串元组的列表:

List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)
Run Code Online (Sandbox Code Playgroud)

像往常一样,如果我们遇到无效输入或文件读取异常,我们希望优雅地失败,我们不想担心手动关闭资源等.

迭代解决方案

首先是一些常规文件读取的东西(它应该是iteratee包的一部分,它目前不提供远程高级别的任何东西):

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO
import iteratee.{ Iteratee => I, _ …
Run Code Online (Sandbox Code Playgroud)

scala scalaz iterate transducer-machines scalaz-stream

48
推荐指数
1
解决办法
3987
查看次数

Scala流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz ...)

我正在关注Coursera的Scala课程中的功能反应编程,我们处理RxScala Observables(基于RxJava).

据我所知,Play Iteratee的库看起来有点像RxScala Observables,其中Observables有点像Enumerators和Observers有点像Iteratees.

还有Scalaz Stream库,也许还有其他一些?


所以我想知道所有这些库之间的主要区别.在哪种情况下,一个可能比另一个更好?


PS:我想知道为什么Play Iteratees库没有被Martin Odersky选择用于他的课程,因为Play在Typesafe堆栈中.这是否意味着Martin喜欢RxScala而不是Play Iteratees?


编辑:无流举措刚刚宣布,作为一种尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure

scala iterate playframework-2.0 rx-java scalaz-stream

32
推荐指数
2
解决办法
2680
查看次数

播放2.x:使用Iteratees上传活动文件

我将从问题开始:如何使用Scala API Iteratee将文件上传到云存储(在我的情况下是Azure Blob存储,但我认为它现在不是最重要的)

背景:

我需要将输入分块为大约1 MB的块,用于存储大型媒体文件(300 MB +)作为Azure BlockBlobs.不幸的是,我的Scala知识仍然很差(我的项目是基于Java的,其中Scala的唯一用途是上传控制器).

我尝试使用这段代码:为什么调用错误或在BodyParser的Iteratee中完成请求在Play Framework 2.0中挂起?(作为Input Iteratee) - 它工作得很好,但Element我可以使用的每个大小为8192字节,因此它太小,无法向云端发送一百兆字节的文件.

我必须说这对我来说是一种全新的方法,而且很可能是我误解了一些东西(不想告诉我,我误解了一切;>)

我会感谢任何提示或链接,这将有助于我这个主题.如果有任何相似用途的样本,那么对我来说这是最好的选择.

scala file-upload azure-storage iterate playframework-2.0

28
推荐指数
1
解决办法
5556
查看次数

iteratee的简介或简单示例?

我发现Oleg关于Iteratee的文档有点难以理解.特别是因为他在Haskell-Cafe的帖子中的一些功能不在iteratee库中(比如enum_file).

有没有对iteratee有一个很好的介绍,通过打开文件/套接字,读取和处理数据等基础知识.

io haskell iterate

26
推荐指数
3
解决办法
3560
查看次数

在Play 2.0中无法理解Iteratee,Enumerator,Enumeratee

我刚刚开始学习Play 2.0 Framework.我无法理解的一件事是游戏教程中描述的Iteratee,Enumerator和Enumeratee模式.我对函数式语言的经验很少.

这种模式有什么作用?

它如何帮助我编写非阻塞/反应代码?

一些简单的例子会有帮助.

scala playframework iterate playframework-2.0

26
推荐指数
1
解决办法
7060
查看次数

Haskell iteratee:剥离尾随空格的简单工作示例

我试图了解如何在Haskell中使用iteratee库.到目前为止,我所看到的所有文章似乎都专注于建立一个如何构建迭代的直觉,这是有帮助的,但现在我想要下来并实际使用它们,我觉得有点海上.查看iteratees的源代码对我来说价值有限.

假设我有这个函数修剪一行的尾随空格:

import Data.ByteString.Char8

rstrip :: ByteString -> ByteString
rstrip = fst . spanEnd isSpace
Run Code Online (Sandbox Code Playgroud)

我想做的是:将它变成一个iteratee,读取一个文件并将其写在其他地方,并从每一行中删除尾随空格.我将如何使用iteratees进行结构化?我看到有一个enumLinesBS在Data.Iteratee.Char功能,我可以下探到这一点,但我不知道我是否应该使用mapChunksconvStream或如何重新包装上面的功能于iteratee.

iteration haskell bytestring iterate

19
推荐指数
1
解决办法
914
查看次数

如果枚举器尝试消耗输入会发生什么?

定义Enumerator是:

type Enumerator a m b = Step a m b -> Iteratee a m b
Run Code Online (Sandbox Code Playgroud)

该文档指出,虽然Iteratees comsume数据,Enumerators产生它.我可以理解如何使用这种类型生成数据:

enumStream :: (Monad m) => Stream a -> Enumerator a m b
enumStream stream step =
    case step of
        Continue k -> k stream
        _          -> returnI step  -- Note: 'stream' is discarded
Run Code Online (Sandbox Code Playgroud)

(enumEOF比这更复杂......这显然是检查,以确保IterateeContinue被发出后EOF,抛如果它的错误.)

即,Iterateea Step运行时生成a runIteratee.这Step是再喂到我的枚举,其与供应它Stream,使其能够继续.我的枚举器返回结果延续.

有一点让我很突出:这个代码在Iterateemonad中运行.这意味着它可以消耗数据,对吗? …

haskell iterate

19
推荐指数
1
解决办法
374
查看次数

Iteratees和FRP之间有什么联系?

在我看来,这两个想法之间存在着密切的联系.我的猜测是,如果有一种方法可以用迭代器表示任意图形,那么FRP可以用Iteratees来实现.但是afaik他们只支持链式结构.

有人可以对此有所了解吗?

haskell frp iterate conduit haskell-pipes

19
推荐指数
2
解决办法
864
查看次数