rrm*_*ley 4 scala scalaz scalaz-stream
我在代码中看到了一个常见的模式.我已经从数据库中排序结果,我需要在嵌套结构中发出它们.我想要这个流,所以我想一次在内存中记录少量记录.使用TravesableLike.groupBy假设数据没有排序,所以它不必要地填充可变映射.我想保持这种真正的流媒体.scalaz-stream在这里有用吗?
val sql = """select grandparent_id, parent_id, child_id
from children
where grandparent_id = ?
order by grandparent_id, parent_id, child_id"""
def elementsR[P, R](invoker: Invoker[P, R], param: P): Process[Task, R] =
// Invoker.elements returns trait CloseableIterator[+T] extends Iterator[T] with Closeable
resource(Task.delay(invoker.elements(param)))(
src => Task.delay(src.close)) { src =>
Task.delay { if (src.hasNext) src.next else throw End }
}
def dbWookie {
// grandparent_id, (grandparent_id, parent_id, child_id)
val invoker = Q.query[Int, (Int, Int, Int)](sql)
val es = elementsR(invoker, 42)
// ?, ?, ?
// nested emits (42, ((35, (1, 3, 7)), (36, (8, 9, 12))))
}
Run Code Online (Sandbox Code Playgroud)
我没有在Process上看到像foldLeft和scanLeft这样的函数太多,所以我不确定如何检测grandparent_id,parent_id或child_id何时更改并发出一个组.有任何想法吗?
我想你想要一些类似的东西chunkBy.chunkBy每当谓词函数的结果翻转true到时,就会发出一个块false.
您可以通过比较布尔值,比较输入的某些任意函数的结果来概括这一点.因此,只要应用于输入的此函数的值发生更改,您就会有一个发出块的进程:
def chunkOn[I, A](f: I => A): Process1[I, Vector[I]] = {
def go(acc: Vector[I], last: A): Process1[I,Vector[I]] =
await1[I].flatMap { i =>
val cur = f(i)
if (cur != last) emit(acc) then go(Vector(i), cur)
else go(acc :+ i, cur)
} orElse emit(acc)
await1[I].flatMap(i => go(Vector(i), f(i)))
}
Run Code Online (Sandbox Code Playgroud)
REPL中的快速脏测试,使用Identity monad立即强制评估:
scala> import scalaz.stream._, scalaz.Id._
import scalaz.stream._
import scalaz.Id._
scala> val rows = Seq(('a, 'b, 'c), ('a, 'b, 'd), ('b, 'a, 'c), ('b, 'd, 'a))
rows: Seq[(Symbol, Symbol, Symbol)] = List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a))
scala> val process = Process.emitSeq[Id, (Symbol, Symbol, Symbol)](rows)
process: scalaz.stream.Process[scalaz.Id.Id,(Symbol, Symbol, Symbol)] =
Emit(List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a)),Halt(scalaz.stream.Process$End$))
scala> process |> chunkOn(_._1)
res4: scalaz.stream.Process[scalaz.Id.Id,scala.collection.immutable.Vector[(Symbol, Symbol, Symbol)]] =
Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))
Run Code Online (Sandbox Code Playgroud)
正如您所建议的那样,chunkWhen对当前值和最后一个值使用谓词,并在求值时发出一个块false.
def chunkWhen[I](f: (I, I) => Boolean): Process1[I, Vector[I]] = {
def go(acc: Vector[I]): Process1[I,Vector[I]] =
await1[I].flatMap { i =>
acc.lastOption match {
case Some(last) if ! f(last, i) => emit(acc) then go(Vector(i))
case _ => go(acc :+ i)
}
} orElse emit(acc)
go(Vector())
}
Run Code Online (Sandbox Code Playgroud)
尝试一下:
scala> process |> chunkWhen(_._1 == _._1)
res0: scalaz.stream.Process[scalaz.Id.Id,Vector[(Symbol, Symbol, Symbol)]] =
Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))
Run Code Online (Sandbox Code Playgroud)