Dav*_* B. 11 monads functional-programming scala scalaz scalaz7
是否可以在常量堆栈和堆空间中在State monad中执行折叠?或者是一种不同的功能技术更适合我的问题?
接下来的部分将描述问题和激励用例.我正在使用Scala,但Haskell中的解决方案也受到欢迎.
State
Monad填充堆假设Scalaz 7.考虑一下州Monad的monadic折叠.为了避免堆栈溢出,我们将蹦蹦跳跳.
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
Run Code Online (Sandbox Code Playgroud)
对于大型集合col
,这将填满堆.
我相信在折叠过程中,会为集合中的每个值(x: R
参数)创建一个闭包(State mobit ),填充堆.run 0
在执行初始状态之前,这些都不能被评估.
可以避免这种O(n)堆使用吗?
更具体地说,是否可以在折叠之前提供初始状态,以便状态monad可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?
或者折叠是否可以构造成在状态monad之后懒洋洋地执行run
?通过这种方式,x: R
直到先前的那些被评估并且适合于垃圾收集之后才会创建下一个闭包.
或者这种工作有更好的功能范例吗?
但也许我正在使用错误的工具来完成工作.下面是一个示例用例的演变.我在这里走错了路吗?
考虑储层采样,即k
从一个集合中挑选一个过大的统一随机项目,以适应内存.在Scala中,这样的功能可能是
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
Run Code Online (Sandbox Code Playgroud)
如果拉皮条入TraversableOnce
类型可以像这样使用
val tenRandomInts = (Int.Min to Int.Max) sample 10
Run Code Online (Sandbox Code Playgroud)
完成的工作sample
基本上是fold
:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
Run Code Online (Sandbox Code Playgroud)
但是,update
是有状态的; 这取决于n
已经看过的物品数量.(它也取决于RNG,但为了简单起见,我认为它是全局的和有状态的.用于处理的技术n
将会非常简单地延伸.).那么如何处理这种状态呢?
不纯的解决方案很简单,并且使用不断的堆栈和堆运行.
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
Run Code Online (Sandbox Code Playgroud)
但是纯功能解决方案呢? update
必须将其n
作为附加参数并将新值与更新的样本一起返回.我们可以n
在隐式状态中包括折叠累加器,例如,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
Run Code Online (Sandbox Code Playgroud)
但这掩盖了意图; 我们只是打算积累样本矢量.这个问题似乎已经为国家monad和monadic left fold做好了准备.让我们再试一次.
我们将使用Scalaz 7,这些导入
import scalaz._
import Scalaz._
import scalaz.std.iterable_
Run Code Online (Sandbox Code Playgroud)
并在一个操作Iterable[A]
中,由于Scalaz不支持的一个monadic折叠Traversable
.
sample
现在已定义
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
Run Code Online (Sandbox Code Playgroud)
更新的地方
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,这会在大型集合上砸堆栈.
让我们蹦蹦跳跳吧.sample
就是现在
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
Run Code Online (Sandbox Code Playgroud)
更新的地方
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
Run Code Online (Sandbox Code Playgroud)
这可以修复堆栈溢出,但仍会为非常大的集合(或非常小的堆)吹出堆.在折叠期间创建集合中每个值的一个匿名函数(我相信关闭每个x: A
参数),在蹦床运行之前消耗堆.(FWIW,State版本也有这个问题;堆栈溢出只是首先出现较小的集合.)
我们真正的问题是未经执行的国家机构使用的堆.
不它不是.真正的问题是该集合不适合内存,foldLeftM
并foldRightM
强制整个集合.不纯的解决方案的副作用是你可以随时释放内存.在"纯功能"解决方案中,您不会在任何地方这样做.
您的使用Iterable
忽略了一个至关重要的细节:col
实际上是什么样的集合,如何创建它的元素以及如何丢弃它们.所以,一定,不foldLeftM
上Iterable
.它可能过于严格,你正在强迫整个集合进入内存.例如,如果它是a Stream
,那么只要你坚持col
到目前为止所有强制的元素将在内存中.如果它是其他一种Iterable
不会忘记其元素的懒惰,那么折叠仍然过于严格.
我尝试了你的第一个例子,EphemeralStream
没有看到任何明显的堆压力,即使它显然会有相同的"未执行的状态暴徒".不同之处在于,EphemeralStream
s的元素被弱引用,并且它foldRight
不会强制整个流.
我怀疑如果你使用了Foldable.foldr
,那么你就不会看到有问题的行为,因为它在第二个参数中使用了一个懒惰的函数进行折叠.当您调用折叠时,您希望它立即返回看起来像这样的悬架:
Suspend(() => head |+| tail.foldRightM(...))
Run Code Online (Sandbox Code Playgroud)
当蹦床恢复第一次暂停并运行到下一次暂停时,悬架之间的所有分配将可由垃圾收集器释放.
请尝试以下方法:
def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
if (bs.isEmpty) Monad[M].point(a)
else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))
val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._
foldM[M,R,Int](Monoid[R].zero, col) {
(x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run
Run Code Online (Sandbox Code Playgroud)
这将在一个trampolined monad的常量堆中运行M
,但是会为非trampolined monad溢出堆栈.
但真正的问题是Iterable
对于太大而无法容纳在内存中的数据而言,这不是一个好的抽象.当然,您可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠.这很有效,直到你想用另一个程序组成该程序.而且我假设你开始研究在State
monad中进行调查的全部原因是为了获得组合性.
所以,你可以做什么?以下是一些选项:
Reducer
,Monoid
以及它们的组合物,然后在必要显式释放环(或trampolined懒惰右倍)作为运行最后一步,之后组合物是不可能的或预期的.Iteratee
成分和monadic Enumerator
来喂它们.这些选项中的最后一个是我将在一般情况下使用和推荐的选项.