如果我有一个EnumeratorT和一个相应的IterateeT我可以一起运行它们:
val en: EnumeratorT[String, Task] = EnumeratorT.enumList(List("a", "b", "c"))
val it: IterateeT[String, Task, Int] = IterateeT.length
(it &= en).run : Task[Int]
Run Code Online (Sandbox Code Playgroud)
如果枚举器monad比iteratee monad"更大",我可以使用up或者更一般Hoist地"提升"iteratee以匹配:
val en: EnumeratorT[String, Task] = ...
val it: IterateeT[String, Id, Int] = ...
val liftedIt = IterateeT.IterateeTMonadTrans[String].hoist(
implicitly[Task |>=| Id]).apply(it)
(liftedIt &= en).run: Task[Int]
Run Code Online (Sandbox Code Playgroud)
但是,当iteratee monad比枚举器monad"更大"时,我该怎么办?
val en: EnumeratorT[String, Id] = ...
val it: IterateeT[String, Task, Int] = ...
it &= ???
Run Code Online (Sandbox Code Playgroud)
似乎没有任何Hoist实例EnumeratorT,也没有任何明显的"提升"方法.
背景
正如在这个问题中所指出的,我正在使用Scalaz 7迭代器来处理恒定堆空间中的大型(即无界)数据流.
我的代码看起来像这样:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
Run Code Online (Sandbox Code Playgroud)
问题
我似乎遇到了内存泄漏,但我对Scalaz/FP不太熟悉,不知道这个bug是在Scalaz中还是在我的代码中.直观地说,我希望这段代码只需要(大约为)P倍的Chunk-size空间.
注:我发现了一个类似的问题,其中OutOfMemoryError遇到,但我的代码没有使用consume.
测试
我跑了一些测试试图找出问题.总而言之,只有在使用zipWithIndex和泄漏时才会出现泄漏group.
// no zipping/grouping …Run Code Online (Sandbox Code Playgroud) 我在许多项目中使用Scalaz 7的迭代,主要用于处理大型文件.我想开始切换到Scalaz 流,这些流旨在取代iteratee包(坦率地说它缺少很多部分并且使用起来很麻烦).
Streams基于机器(iteratee理念的另一种变体),它也已在Haskell中实现.我已经使用了Haskell机器库,但机器和流之间的关系并不是完全明显的(至少对我来说),并且流库的文档仍然有点稀疏.
这个问题是关于一个简单的解析任务,我希望看到使用流而不是迭代来实现.如果没有其他人能打败我,我会自己回答这个问题,但我确信我不是唯一一个正在(甚至考虑)这种转变的人,因为无论如何我需要完成这项工作,想我不妨在公共场合做这件事.
假设我有一个包含已被标记化并用词性标记的句子的文件:
no UH
, ,
it PRP
was VBD
n't RB
monday NNP
. .
the DT
equity NN
market NN
was VBD
illiquid JJ
. .
Run Code Online (Sandbox Code Playgroud)
每行有一个标记,单词和词性由单个空格分隔,空白行表示句子边界.我想解析这个文件并返回一个句子列表,我们也可以将它们表示为字符串元组的列表:
List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)
Run Code Online (Sandbox Code Playgroud)
像往常一样,如果我们遇到无效输入或文件读取异常,我们希望优雅地失败,我们不想担心手动关闭资源等.
首先是一些常规文件读取的东西(它应该是iteratee包的一部分,它目前不提供远程高级别的任何东西):
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO
import iteratee.{ Iteratee => I, _ …Run Code Online (Sandbox Code Playgroud) 我正在关注Coursera的Scala课程中的功能反应编程,我们处理RxScala Observables(基于RxJava).
据我所知,Play Iteratee的库看起来有点像RxScala Observables,其中Observables有点像Enumerators和Observers有点像Iteratees.
还有Scalaz Stream库,也许还有其他一些?
所以我想知道所有这些库之间的主要区别.在哪种情况下,一个可能比另一个更好?
PS:我想知道为什么Play Iteratees库没有被Martin Odersky选择用于他的课程,因为Play在Typesafe堆栈中.这是否意味着Martin喜欢RxScala而不是Play Iteratees?
编辑:在无流举措刚刚宣布,作为一种尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure
我将从问题开始:如何使用Scala API Iteratee将文件上传到云存储(在我的情况下是Azure Blob存储,但我认为它现在不是最重要的)
背景:
我需要将输入分块为大约1 MB的块,用于存储大型媒体文件(300 MB +)作为Azure BlockBlobs.不幸的是,我的Scala知识仍然很差(我的项目是基于Java的,其中Scala的唯一用途是上传控制器).
我尝试使用这段代码:为什么调用错误或在BodyParser的Iteratee中完成请求在Play Framework 2.0中挂起?(作为Input Iteratee) - 它工作得很好,但Element我可以使用的每个大小为8192字节,因此它太小,无法向云端发送一百兆字节的文件.
我必须说这对我来说是一种全新的方法,而且很可能是我误解了一些东西(不想告诉我,我误解了一切;>)
我会感谢任何提示或链接,这将有助于我这个主题.如果有任何相似用途的样本,那么对我来说这是最好的选择.
我发现Oleg关于Iteratee的文档有点难以理解.特别是因为他在Haskell-Cafe的帖子中的一些功能不在iteratee库中(比如enum_file).
有没有对iteratee有一个很好的介绍,通过打开文件/套接字,读取和处理数据等基础知识.
我刚刚开始学习Play 2.0 Framework.我无法理解的一件事是游戏教程中描述的Iteratee,Enumerator和Enumeratee模式.我对函数式语言的经验很少.
这种模式有什么作用?
它如何帮助我编写非阻塞/反应代码?
一些简单的例子会有帮助.
我试图了解如何在Haskell中使用iteratee库.到目前为止,我所看到的所有文章似乎都专注于建立一个如何构建迭代的直觉,这是有帮助的,但现在我想要下来并实际使用它们,我觉得有点海上.查看iteratees的源代码对我来说价值有限.
假设我有这个函数修剪一行的尾随空格:
import Data.ByteString.Char8
rstrip :: ByteString -> ByteString
rstrip = fst . spanEnd isSpace
Run Code Online (Sandbox Code Playgroud)
我想做的是:将它变成一个iteratee,读取一个文件并将其写在其他地方,并从每一行中删除尾随空格.我将如何使用iteratees进行结构化?我看到有一个enumLinesBS在Data.Iteratee.Char功能,我可以下探到这一点,但我不知道我是否应该使用mapChunks或convStream或如何重新包装上面的功能于iteratee.
定义Enumerator是:
type Enumerator a m b = Step a m b -> Iteratee a m b
Run Code Online (Sandbox Code Playgroud)
该文档指出,虽然Iteratees comsume数据,Enumerators产生它.我可以理解如何使用这种类型生成数据:
enumStream :: (Monad m) => Stream a -> Enumerator a m b
enumStream stream step =
case step of
Continue k -> k stream
_ -> returnI step -- Note: 'stream' is discarded
Run Code Online (Sandbox Code Playgroud)
(enumEOF比这更复杂......这显然是检查,以确保Iteratee不Continue被发出后EOF,抛如果它的错误.)
即,Iterateea Step运行时生成a runIteratee.这Step是再喂到我的枚举,其与供应它Stream,使其能够继续.我的枚举器返回结果延续.
有一点让我很突出:这个代码在Iterateemonad中运行.这意味着它可以消耗数据,对吗? …
在我看来,这两个想法之间存在着密切的联系.我的猜测是,如果有一种方法可以用迭代器表示任意图形,那么FRP可以用Iteratees来实现.但是afaik他们只支持链式结构.
有人可以对此有所了解吗?
iterate ×10
scala ×6
haskell ×4
scalaz ×3
bytestring ×1
conduit ×1
enumerator ×1
file-upload ×1
frp ×1
io ×1
iteration ×1
monads ×1
rx-java ×1